查看源代码 使用 ETS 加速
每次我们需要查找一个桶时,都需要向注册表发送一条消息。如果我们的注册表被多个进程同时访问,注册表可能会成为瓶颈!
在本章中,我们将学习 ETS (Erlang Term Storage) 以及如何将其用作缓存机制。
警告!不要过早地使用 ETS 作为缓存!记录和分析您的应用程序性能,并找出哪些部分是瓶颈,以便您知道是否应该缓存以及应该缓存什么。本章仅仅是 ETS 用法的示例,一旦确定了需求,就可以使用它。
ETS 作为缓存
ETS 允许我们将任何 Elixir 项存储在内存中的表中。使用 ETS 表是通过 Erlang 的 :ets
模块 进行的。
iex> table = :ets.new(:buckets_registry, [:set, :protected])
#Reference<0.1885502827.460455937.234656>
iex> :ets.insert(table, {"foo", self()})
true
iex> :ets.lookup(table, "foo")
[{"foo", #PID<0.41.0>}]
创建 ETS 表时,需要两个参数:表名和一组选项。从可用选项中,我们传递了表类型及其访问规则。我们选择了 :set
类型,这意味着键不能重复。我们还将表的访问权限设置为 :protected
,这意味着只有创建表的进程才能写入它,但所有进程都可以从中读取。可能的访问控制
:public
- 所有进程都可以读写。
:protected
- 所有进程都可以读取。只能由拥有进程写入。这是默认值。
:private
- 读写权限仅限于拥有进程。
请注意,如果您的读写调用违反了访问控制,操作将引发 ArgumentError
。最后,由于 :set
和 :protected
是默认值,所以我们从现在起将跳过它们。
ETS 表也可以命名,允许我们通过给定名称访问它们。
iex> :ets.new(:buckets_registry, [:named_table])
:buckets_registry
iex> :ets.insert(:buckets_registry, {"foo", self()})
true
iex> :ets.lookup(:buckets_registry, "foo")
[{"foo", #PID<0.41.0>}]
让我们更改 KV.Registry
以使用 ETS 表。第一个更改是修改我们的注册表以要求一个名称参数,我们将使用它来命名 ETS 表和注册表进程本身。ETS 名称和进程名称存储在不同的位置,因此不会发生冲突。
打开 lib/kv/registry.ex
,让我们更改其实现。我们在源代码中添加了注释以突出显示我们所做的更改。
defmodule KV.Registry do
use GenServer
## Client API
@doc """
Starts the registry with the given options.
`:name` is always required.
"""
def start_link(opts) do
# 1. Pass the name to GenServer's init
server = Keyword.fetch!(opts, :name)
GenServer.start_link(__MODULE__, server, opts)
end
@doc """
Looks up the bucket pid for `name` stored in `server`.
Returns `{:ok, pid}` if the bucket exists, `:error` otherwise.
"""
def lookup(server, name) do
# 2. Lookup is now done directly in ETS, without accessing the server
case :ets.lookup(server, name) do
[{^name, pid}] -> {:ok, pid}
[] -> :error
end
end
@doc """
Ensures there is a bucket associated with the given `name` in `server`.
"""
def create(server, name) do
GenServer.cast(server, {:create, name})
end
## Server callbacks
@impl true
def init(table) do
# 3. We have replaced the names map by the ETS table
names = :ets.new(table, [:named_table, read_concurrency: true])
refs = %{}
{:ok, {names, refs}}
end
# 4. The previous handle_call callback for lookup was removed
@impl true
def handle_cast({:create, name}, {names, refs}) do
# 5. Read and write to the ETS table instead of the map
case lookup(names, name) do
{:ok, _pid} ->
{:noreply, {names, refs}}
:error ->
{:ok, pid} = DynamicSupervisor.start_child(KV.BucketSupervisor, KV.Bucket)
ref = Process.monitor(pid)
refs = Map.put(refs, ref, name)
:ets.insert(names, {name, pid})
{:noreply, {names, refs}}
end
end
@impl true
def handle_info({:DOWN, ref, :process, _pid, _reason}, {names, refs}) do
# 6. Delete from the ETS table instead of the map
{name, refs} = Map.pop(refs, ref)
:ets.delete(names, name)
{:noreply, {names, refs}}
end
@impl true
def handle_info(_msg, state) do
{:noreply, state}
end
end
请注意,在我们更改之前,KV.Registry.lookup/2
将请求发送到服务器,但现在它直接从 ETS 表中读取,该表在所有进程中共享。这就是我们正在实现的缓存机制背后的主要思想。
为了使缓存机制正常工作,创建的 ETS 表需要具有 :protected
访问权限(默认值),因此所有客户端都可以从中读取,而只有 KV.Registry
进程可以写入它。我们在启动表时还设置了 read_concurrency: true
,为并发读取操作的常见场景优化表。
我们上面所做的更改破坏了我们的测试,因为注册表在启动时需要 :name
选项。此外,一些注册表操作(例如 lookup/2
)需要将名称作为参数而不是 PID 提供,以便我们可以进行 ETS 表查找。让我们更改 test/kv/registry_test.exs
中的设置函数以解决这两个问题。
setup context do
_ = start_supervised!({KV.Registry, name: context.test})
%{registry: context.test}
end
由于每个测试都有一个唯一的名称,我们使用测试名称来命名我们的注册表。这样,我们就不再需要传递注册表 PID,而是通过测试名称来识别它。另请注意,我们将 start_supervised!
的结果分配给下划线 (_
)。这种习惯用法通常用于表示我们对 start_supervised!
的结果不感兴趣。
更改 setup
后,一些测试将继续失败。您甚至可能会注意到测试在不同运行之间通过和失败。例如,“生成桶”测试
test "spawns buckets", %{registry: registry} do
assert KV.Registry.lookup(registry, "shopping") == :error
KV.Registry.create(registry, "shopping")
assert {:ok, bucket} = KV.Registry.lookup(registry, "shopping")
KV.Bucket.put(bucket, "milk", 1)
assert KV.Bucket.get(bucket, "milk") == 1
end
可能在这一行失败
{:ok, bucket} = KV.Registry.lookup(registry, "shopping")
如果我们在前一行刚创建了桶,这一行怎么会失败呢?
出现这些故障的原因是,为了教学目的,我们犯了两个错误。
- 我们过早地优化(通过添加这个缓存层)
- 我们正在使用
cast/2
(而我们应该使用call/2
)
竞争条件?
在 Elixir 中开发不会使您的代码免受竞争条件。但是,Elixir 的抽象(其中默认情况下没有共享任何内容)使更容易发现竞争条件的根本原因。
在我们的测试中发生的是,在操作和我们可以在 ETS 表中观察到此更改的时间之间存在延迟。以下是我们期望发生的事情。
- 我们调用
KV.Registry.create(registry, "shopping")
- 注册表创建桶并更新缓存表
- 我们使用
KV.Registry.lookup(registry, "shopping")
从表中访问信息 - 上面的命令返回
{:ok, bucket}
但是,由于 KV.Registry.create/2
是一个 cast 操作,因此该命令将在我们实际写入表之前返回!换句话说,正在发生的是
- 我们调用
KV.Registry.create(registry, "shopping")
- 我们使用
KV.Registry.lookup(registry, "shopping")
从表中访问信息 - 上面的命令返回
:error
- 注册表创建桶并更新缓存表
为了解决此故障,我们需要使用 call/2
而不是 cast/2
,使 KV.Registry.create/2
同步。这将保证客户端只有在对表进行更改后才能继续。让我们回到 lib/kv/registry.ex
并更改函数及其回调,如下所示
def create(server, name) do
GenServer.call(server, {:create, name})
end
@impl true
def handle_call({:create, name}, _from, {names, refs}) do
case lookup(names, name) do
{:ok, pid} ->
{:reply, pid, {names, refs}}
:error ->
{:ok, pid} = DynamicSupervisor.start_child(KV.BucketSupervisor, KV.Bucket)
ref = Process.monitor(pid)
refs = Map.put(refs, ref, name)
:ets.insert(names, {name, pid})
{:reply, pid, {names, refs}}
end
end
我们将回调从 handle_cast/2
更改为 handle_call/3
,并将其更改为回复创建的桶的 PID。一般来说,Elixir 开发者更喜欢使用 call/2
而不是 cast/2
,因为它还提供反压——您会阻塞,直到收到回复。在没有必要的情况下使用 cast/2
也可以被认为是过早优化。
让我们再次运行测试。这次,我们将传递 --trace
选项
$ mix test --trace
--trace
选项在您的测试死锁或存在竞争条件时非常有用,因为它同步运行所有测试 (async: true
无效) 并显示有关每个测试的详细信息。如果您多次运行测试,您可能会看到这个间歇性故障
1) test removes buckets on exit (KV.RegistryTest)
test/kv/registry_test.exs:19
Assertion with == failed
code: assert KV.Registry.lookup(registry, "shopping") == :error
left: {:ok, #PID<0.109.0>}
right: :error
stacktrace:
test/kv/registry_test.exs:23
根据故障消息,我们期望桶不再存在于表中,但它仍然存在!这个问题与我们刚刚解决的问题相反:以前在创建桶的命令和更新表之间存在延迟,现在在桶进程死亡和它的条目从表中删除之间存在延迟。由于这是一个竞争条件,您可能无法在您的机器上重现它,但它就在那里。
上次我们通过将异步操作 (一个 cast
) 替换为同步操作 (一个 call
) 来解决竞争条件。不幸的是,我们正在使用的 handle_info/2
回调来接收 :DOWN
消息并从 ETS 表中删除条目没有同步等效项。这次,我们需要找到一种方法来保证注册表已处理桶进程终止时发送的 :DOWN
通知。
一个简单的方法是在执行桶查找之前向注册表发送同步请求。 Agent.stop/2
操作是同步的,只有在桶进程终止后才返回。因此,一旦 Agent.stop/2
返回,注册表就会收到 :DOWN
消息,但它可能尚未处理它。为了保证处理 :DOWN
消息,我们可以执行同步请求。由于消息按顺序处理,一旦注册表回复同步请求,那么 :DOWN
消息肯定会被处理。
让我们通过在 Agent.stop/2
之后创建“伪造”桶(这是一个同步请求),在 test/kv/registry_test.exs
中的两个“删除”测试中做到这一点。
test "removes buckets on exit", %{registry: registry} do
KV.Registry.create(registry, "shopping")
{:ok, bucket} = KV.Registry.lookup(registry, "shopping")
Agent.stop(bucket)
# Do a call to ensure the registry processed the DOWN message
_ = KV.Registry.create(registry, "bogus")
assert KV.Registry.lookup(registry, "shopping") == :error
end
test "removes bucket on crash", %{registry: registry} do
KV.Registry.create(registry, "shopping")
{:ok, bucket} = KV.Registry.lookup(registry, "shopping")
# Stop the bucket with non-normal reason
Agent.stop(bucket, :shutdown)
# Do a call to ensure the registry processed the DOWN message
_ = KV.Registry.create(registry, "bogus")
assert KV.Registry.lookup(registry, "shopping") == :error
end
我们的测试现在应该(总是)通过!
这结束了我们的优化章节。我们已经将 ETS 用作缓存机制,其中读取可以从任何进程发生,但写入仍然通过单个进程进行序列化。更重要的是,我们还了解到,一旦数据可以异步读取,我们就需要意识到它可能带来的竞争条件。
实际上,如果您发现自己需要一个动态进程的注册表,您应该使用 Elixir 提供的 Registry
模块。它提供了类似于我们使用 GenServer + :ets
创建的功能,同时还能并发地执行写入和读取。 它已被证明可以在所有核心上扩展,即使在拥有 40 个核心的机器上也是如此。
接下来,让我们讨论外部和内部依赖项以及 Mix 如何帮助我们管理大型代码库。