查看源代码 PartitionSupervisor (Elixir v1.16.2)
一个启动相同子进程的多个分区的监管程序。
某些进程在大型系统中可能会成为瓶颈。如果这些进程可以轻松地对其状态进行分区,并且它们之间没有依赖关系,那么它们可以使用 PartitionSupervisor
创建多个隔离和独立的分区。
启动 PartitionSupervisor
后,您可以使用 {:via, PartitionSupervisor, {name, key}}
向其子进程发送消息,其中 name
是 PartitionSupervisor
的名称,key 用于路由。
该模块在 Elixir v1.14.0 中引入。
简单示例
让我们从一个本身并不实用的示例开始,但它展示了如何启动分区以及如何将消息路由到它们。
这是一个简单的 GenServer,它只是收集它接收到的消息。它会打印消息以方便说明。
defmodule Collector do
use GenServer
def start_link(args) do
GenServer.start_link(__MODULE__, args)
end
def init(args) do
IO.inspect([__MODULE__, " got args ", args, " in ", self()])
{:ok, _initial_state = []}
end
def collect(server, msg) do
GenServer.call(server, {:collect, msg})
end
def handle_call({:collect, msg}, _from, state) do
new_state = [msg | state]
IO.inspect(["current messages:", new_state, " in process", self()])
{:reply, :ok, new_state}
end
end
要运行多个此类进程,我们可以在我们的监管树中将它们启动在一个 PartitionSupervisor
下。
{PartitionSupervisor,
child_spec: Collector.child_spec([some: :arg]),
name: MyApp.PartitionSupervisor
}
我们可以使用“via 元组”向它们发送消息。
# The key is used to route our message to a particular instance.
key = 1
Collector.collect({:via, PartitionSupervisor, {MyApp.PartitionSupervisor, key}}, :hi)
# ["current messages:", [:hi], " in process", #PID<0.602.0>]
:ok
Collector.collect({:via, PartitionSupervisor, {MyApp.PartitionSupervisor, key}}, :ho)
# ["current messages:", [:ho, :hi], " in process", #PID<0.602.0>]
:ok
# With a different key, the message will be routed to a different instance.
key = 2
Collector.collect({:via, PartitionSupervisor, {MyApp.PartitionSupervisor, key}}, :a)
# ["current messages:", [:a], " in process", #PID<0.603.0>]
:ok
Collector.collect({:via, PartitionSupervisor, {MyApp.PartitionSupervisor, key}}, :b)
# ["current messages:", [:b, :a], " in process", #PID<0.603.0>]
:ok
现在让我们继续一个有用的示例。
DynamicSupervisor
示例
DynamicSupervisor
是一个负责启动其他进程的单个进程。在某些应用程序中,DynamicSupervisor
可能会成为瓶颈。为了解决这个问题,您可以通过 PartitionSupervisor
启动 DynamicSupervisor
的多个实例,然后选择一个“随机”实例来启动子进程。
不要启动单个 DynamicSupervisor
children = [
{DynamicSupervisor, name: MyApp.DynamicSupervisor}
]
Supervisor.start_link(children, strategy: :one_for_one)
并直接在该动态监管程序上启动子进程
DynamicSupervisor.start_child(MyApp.DynamicSupervisor, {Agent, fn -> %{} end})
您可以将动态监管程序启动在 PartitionSupervisor
下。
children = [
{PartitionSupervisor,
child_spec: DynamicSupervisor,
name: MyApp.DynamicSupervisors}
]
Supervisor.start_link(children, strategy: :one_for_one)
然后
DynamicSupervisor.start_child(
{:via, PartitionSupervisor, {MyApp.DynamicSupervisors, self()}},
{Agent, fn -> %{} end}
)
在上面的代码中,我们启动了一个分区监管程序,它默认情况下将为您的机器中的每个核心启动一个动态监管程序。然后,您不是按名称调用 DynamicSupervisor
,而是使用 {:via, PartitionSupervisor, {name, key}}
格式通过分区监管程序调用它。我们选择了 self()
作为路由键,这意味着每个进程将被分配一个现有的动态监管程序。请参阅 start_link/1
以查看 PartitionSupervisor
支持的所有选项。
实现说明
PartitionSupervisor
使用 ETS 表或 Registry
来管理所有分区。在内部,PartitionSupervisor
为每个分区生成一个子进程规范,然后充当常规监管程序。每个子进程规范的 ID 都是分区号。
对于路由,使用两种策略。如果 key
是一个整数,则使用 rem(abs(key), partitions)
进行路由,其中 partitions
是分区数。否则,它使用 :erlang.phash2(key, partitions)
。特定的路由将来可能会更改,因此不能依赖它。如果您想检索某个键的特定 PID,可以使用 GenServer.whereis({:via, PartitionSupervisor, {name, key}})
。
总结
类型
函数
@spec count_children(name()) :: %{ specs: non_neg_integer(), active: non_neg_integer(), supervisors: non_neg_integer(), workers: non_neg_integer() }
返回包含监管程序计数值的映射。
该映射包含以下键
:specs
- 分区(子进程)数量:active
- 此监管程序管理的所有正在运行的子进程的计数:supervisors
- 所有监管程序的计数,无论子进程是否仍然存活:workers
- 所有工作进程的计数,无论子进程是否仍然存活
@spec partitions(name()) :: pos_integer()
返回分区监管程序的分区数。
@spec start_link(keyword()) :: Supervisor.on_start()
使用给定的选项启动分区监管程序。
此函数通常不会直接调用,而是当使用 PartitionSupervisor
作为另一个监管程序的子进程时调用
children = [
{PartitionSupervisor, child_spec: SomeChild, name: MyPartitionSupervisor}
]
如果监管程序成功生成,此函数返回 {:ok, pid}
,其中 pid
是监管程序的 PID。如果分区监管程序的给定名称已分配给一个进程,则该函数返回 {:error, {:already_started, pid}}
,其中 pid
是该进程的 PID。
请注意,使用此函数启动的监管程序与父进程链接,并且不仅在崩溃时退出,而且在父进程以 :normal
原因退出时也会退出。
选项
:name
- 一个原子或 via 元组,表示分区监管程序的名称(参见name/0
)。:child_spec
- 启动分区时要使用的子进程规范。:partitions
- 一个正整数,表示分区数。默认为System.schedulers_online()
(通常是核心数)。:strategy
- 重启策略选项,默认为:one_for_one
。您可以在Supervisor
模块文档中了解有关策略的更多信息。:max_restarts
- 一段时间内允许的最大重启次数。默认为3
。:max_seconds
-:max_restarts
适用的时间范围。默认为5
。:with_arguments
- 一个有两个参数的匿名函数,允许将分区传递给子进程启动函数。请参见下面的:with_arguments
部分。
:with_arguments
有时您希望每个分区都知道它们分配的分区号。这可以通过 :with_arguments
选项来完成。此函数接收 :child_spec
选项的值和一个整数作为分区号。它必须返回一个新的参数列表,该列表将用于启动分区进程。
例如,大多数进程通过调用 start_link(opts)
启动,其中 opts
是一个关键字列表。您可以将分区注入到传递给子进程的选项中。
with_arguments: fn [opts], partition ->
[Keyword.put(opts, :partition, partition)]
end
使用给定的 reason
同步停止给定的分区监管程序。
如果监管程序以给定的原因终止,它返回 :ok
。如果它以其他原因终止,则调用退出。
此函数保留 OTP 关于错误报告的语义。如果原因不是 :normal
、:shutdown
或 {:shutdown, _}
,则会记录错误报告。
@spec which_children(name()) :: [ {:undefined, pid() | :restarting, :worker | :supervisor, [module()] | :dynamic} ]
返回一个包含所有子进程信息的列表。
此函数返回一个包含以下元组的列表
id
- 分区号child
- 对应子进程的 PID,如果该进程即将重启,则为原子:restarting
type
-:worker
或:supervisor
,如子进程规范中定义modules
- 如子进程规范中定义