查看源代码 使用 ETS 加速

每次我们需要查找一个桶时,都需要向注册表发送一条消息。如果我们的注册表被多个进程同时访问,注册表可能会成为瓶颈!

在本章中,我们将学习 ETS (Erlang Term Storage) 以及如何将其用作缓存机制。

警告!不要过早地使用 ETS 作为缓存!记录和分析您的应用程序性能,并找出哪些部分是瓶颈,以便您知道是否应该缓存以及应该缓存什么。本章仅仅是 ETS 用法的示例,一旦确定了需求,就可以使用它。

ETS 作为缓存

ETS 允许我们将任何 Elixir 项存储在内存中的表中。使用 ETS 表是通过 Erlang 的 :ets 模块 进行的。

iex> table = :ets.new(:buckets_registry, [:set, :protected])
#Reference<0.1885502827.460455937.234656>
iex> :ets.insert(table, {"foo", self()})
true
iex> :ets.lookup(table, "foo")
[{"foo", #PID<0.41.0>}]

创建 ETS 表时,需要两个参数:表名和一组选项。从可用选项中,我们传递了表类型及其访问规则。我们选择了 :set 类型,这意味着键不能重复。我们还将表的访问权限设置为 :protected,这意味着只有创建表的进程才能写入它,但所有进程都可以从中读取。可能的访问控制

:public - 所有进程都可以读写。

:protected - 所有进程都可以读取。只能由拥有进程写入。这是默认值。

:private - 读写权限仅限于拥有进程。

请注意,如果您的读写调用违反了访问控制,操作将引发 ArgumentError。最后,由于 :set:protected 是默认值,所以我们从现在起将跳过它们。

ETS 表也可以命名,允许我们通过给定名称访问它们。

iex> :ets.new(:buckets_registry, [:named_table])
:buckets_registry
iex> :ets.insert(:buckets_registry, {"foo", self()})
true
iex> :ets.lookup(:buckets_registry, "foo")
[{"foo", #PID<0.41.0>}]

让我们更改 KV.Registry 以使用 ETS 表。第一个更改是修改我们的注册表以要求一个名称参数,我们将使用它来命名 ETS 表和注册表进程本身。ETS 名称和进程名称存储在不同的位置,因此不会发生冲突。

打开 lib/kv/registry.ex,让我们更改其实现。我们在源代码中添加了注释以突出显示我们所做的更改。

defmodule KV.Registry do
  use GenServer

  ## Client API

  @doc """
  Starts the registry with the given options.

  `:name` is always required.
  """
  def start_link(opts) do
    # 1. Pass the name to GenServer's init
    server = Keyword.fetch!(opts, :name)
    GenServer.start_link(__MODULE__, server, opts)
  end

  @doc """
  Looks up the bucket pid for `name` stored in `server`.

  Returns `{:ok, pid}` if the bucket exists, `:error` otherwise.
  """
  def lookup(server, name) do
    # 2. Lookup is now done directly in ETS, without accessing the server
    case :ets.lookup(server, name) do
      [{^name, pid}] -> {:ok, pid}
      [] -> :error
    end
  end

  @doc """
  Ensures there is a bucket associated with the given `name` in `server`.
  """
  def create(server, name) do
    GenServer.cast(server, {:create, name})
  end

  ## Server callbacks

  @impl true
  def init(table) do
    # 3. We have replaced the names map by the ETS table
    names = :ets.new(table, [:named_table, read_concurrency: true])
    refs  = %{}
    {:ok, {names, refs}}
  end

  # 4. The previous handle_call callback for lookup was removed

  @impl true
  def handle_cast({:create, name}, {names, refs}) do
    # 5. Read and write to the ETS table instead of the map
    case lookup(names, name) do
      {:ok, _pid} ->
        {:noreply, {names, refs}}

      :error ->
        {:ok, pid} = DynamicSupervisor.start_child(KV.BucketSupervisor, KV.Bucket)
        ref = Process.monitor(pid)
        refs = Map.put(refs, ref, name)
        :ets.insert(names, {name, pid})
        {:noreply, {names, refs}}
    end
  end

  @impl true
  def handle_info({:DOWN, ref, :process, _pid, _reason}, {names, refs}) do
    # 6. Delete from the ETS table instead of the map
    {name, refs} = Map.pop(refs, ref)
    :ets.delete(names, name)
    {:noreply, {names, refs}}
  end

  @impl true
  def handle_info(_msg, state) do
    {:noreply, state}
  end
end

请注意,在我们更改之前,KV.Registry.lookup/2 将请求发送到服务器,但现在它直接从 ETS 表中读取,该表在所有进程中共享。这就是我们正在实现的缓存机制背后的主要思想。

为了使缓存机制正常工作,创建的 ETS 表需要具有 :protected 访问权限(默认值),因此所有客户端都可以从中读取,而只有 KV.Registry 进程可以写入它。我们在启动表时还设置了 read_concurrency: true,为并发读取操作的常见场景优化表。

我们上面所做的更改破坏了我们的测试,因为注册表在启动时需要 :name 选项。此外,一些注册表操作(例如 lookup/2)需要将名称作为参数而不是 PID 提供,以便我们可以进行 ETS 表查找。让我们更改 test/kv/registry_test.exs 中的设置函数以解决这两个问题。

  setup context do
    _ = start_supervised!({KV.Registry, name: context.test})
    %{registry: context.test}
  end

由于每个测试都有一个唯一的名称,我们使用测试名称来命名我们的注册表。这样,我们就不再需要传递注册表 PID,而是通过测试名称来识别它。另请注意,我们将 start_supervised! 的结果分配给下划线 (_)。这种习惯用法通常用于表示我们对 start_supervised! 的结果不感兴趣。

更改 setup 后,一些测试将继续失败。您甚至可能会注意到测试在不同运行之间通过和失败。例如,“生成桶”测试

test "spawns buckets", %{registry: registry} do
  assert KV.Registry.lookup(registry, "shopping") == :error

  KV.Registry.create(registry, "shopping")
  assert {:ok, bucket} = KV.Registry.lookup(registry, "shopping")

  KV.Bucket.put(bucket, "milk", 1)
  assert KV.Bucket.get(bucket, "milk") == 1
end

可能在这一行失败

{:ok, bucket} = KV.Registry.lookup(registry, "shopping")

如果我们在前一行刚创建了桶,这一行怎么会失败呢?

出现这些故障的原因是,为了教学目的,我们犯了两个错误。

  1. 我们过早地优化(通过添加这个缓存层)
  2. 我们正在使用 cast/2(而我们应该使用 call/2

竞争条件?

在 Elixir 中开发不会使您的代码免受竞争条件。但是,Elixir 的抽象(其中默认情况下没有共享任何内容)使更容易发现竞争条件的根本原因。

在我们的测试中发生的是,在操作和我们可以在 ETS 表中观察到此更改的时间之间存在延迟。以下是我们期望发生的事情。

  1. 我们调用 KV.Registry.create(registry, "shopping")
  2. 注册表创建桶并更新缓存表
  3. 我们使用 KV.Registry.lookup(registry, "shopping") 从表中访问信息
  4. 上面的命令返回 {:ok, bucket}

但是,由于 KV.Registry.create/2 是一个 cast 操作,因此该命令将在我们实际写入表之前返回!换句话说,正在发生的是

  1. 我们调用 KV.Registry.create(registry, "shopping")
  2. 我们使用 KV.Registry.lookup(registry, "shopping") 从表中访问信息
  3. 上面的命令返回 :error
  4. 注册表创建桶并更新缓存表

为了解决此故障,我们需要使用 call/2 而不是 cast/2,使 KV.Registry.create/2 同步。这将保证客户端只有在对表进行更改后才能继续。让我们回到 lib/kv/registry.ex 并更改函数及其回调,如下所示

def create(server, name) do
  GenServer.call(server, {:create, name})
end
@impl true
def handle_call({:create, name}, _from, {names, refs}) do
  case lookup(names, name) do
    {:ok, pid} ->
      {:reply, pid, {names, refs}}

    :error ->
      {:ok, pid} = DynamicSupervisor.start_child(KV.BucketSupervisor, KV.Bucket)
      ref = Process.monitor(pid)
      refs = Map.put(refs, ref, name)
      :ets.insert(names, {name, pid})
      {:reply, pid, {names, refs}}
  end
end

我们将回调从 handle_cast/2 更改为 handle_call/3,并将其更改为回复创建的桶的 PID。一般来说,Elixir 开发者更喜欢使用 call/2 而不是 cast/2,因为它还提供反压——您会阻塞,直到收到回复。在没有必要的情况下使用 cast/2 也可以被认为是过早优化。

让我们再次运行测试。这次,我们将传递 --trace 选项

$ mix test --trace

--trace 选项在您的测试死锁或存在竞争条件时非常有用,因为它同步运行所有测试 (async: true 无效) 并显示有关每个测试的详细信息。如果您多次运行测试,您可能会看到这个间歇性故障

  1) test removes buckets on exit (KV.RegistryTest)
     test/kv/registry_test.exs:19
     Assertion with == failed
     code:  assert KV.Registry.lookup(registry, "shopping") == :error
     left:  {:ok, #PID<0.109.0>}
     right: :error
     stacktrace:
       test/kv/registry_test.exs:23

根据故障消息,我们期望桶不再存在于表中,但它仍然存在!这个问题与我们刚刚解决的问题相反:以前在创建桶的命令和更新表之间存在延迟,现在在桶进程死亡和它的条目从表中删除之间存在延迟。由于这是一个竞争条件,您可能无法在您的机器上重现它,但它就在那里。

上次我们通过将异步操作 (一个 cast) 替换为同步操作 (一个 call) 来解决竞争条件。不幸的是,我们正在使用的 handle_info/2 回调来接收 :DOWN 消息并从 ETS 表中删除条目没有同步等效项。这次,我们需要找到一种方法来保证注册表已处理桶进程终止时发送的 :DOWN 通知。

一个简单的方法是在执行桶查找之前向注册表发送同步请求。 Agent.stop/2 操作是同步的,只有在桶进程终止后才返回。因此,一旦 Agent.stop/2 返回,注册表就会收到 :DOWN 消息,但它可能尚未处理它。为了保证处理 :DOWN 消息,我们可以执行同步请求。由于消息按顺序处理,一旦注册表回复同步请求,那么 :DOWN 消息肯定会被处理。

让我们通过在 Agent.stop/2 之后创建“伪造”桶(这是一个同步请求),在 test/kv/registry_test.exs 中的两个“删除”测试中做到这一点。

  test "removes buckets on exit", %{registry: registry} do
    KV.Registry.create(registry, "shopping")
    {:ok, bucket} = KV.Registry.lookup(registry, "shopping")
    Agent.stop(bucket)

    # Do a call to ensure the registry processed the DOWN message
    _ = KV.Registry.create(registry, "bogus")
    assert KV.Registry.lookup(registry, "shopping") == :error
  end

  test "removes bucket on crash", %{registry: registry} do
    KV.Registry.create(registry, "shopping")
    {:ok, bucket} = KV.Registry.lookup(registry, "shopping")

    # Stop the bucket with non-normal reason
    Agent.stop(bucket, :shutdown)

    # Do a call to ensure the registry processed the DOWN message
    _ = KV.Registry.create(registry, "bogus")
    assert KV.Registry.lookup(registry, "shopping") == :error
  end

我们的测试现在应该(总是)通过!

这结束了我们的优化章节。我们已经将 ETS 用作缓存机制,其中读取可以从任何进程发生,但写入仍然通过单个进程进行序列化。更重要的是,我们还了解到,一旦数据可以异步读取,我们就需要意识到它可能带来的竞争条件。

实际上,如果您发现自己需要一个动态进程的注册表,您应该使用 Elixir 提供的 Registry 模块。它提供了类似于我们使用 GenServer + :ets 创建的功能,同时还能并发地执行写入和读取。 它已被证明可以在所有核心上扩展,即使在拥有 40 个核心的机器上也是如此

接下来,让我们讨论外部和内部依赖项以及 Mix 如何帮助我们管理大型代码库。