查看源代码 PartitionSupervisor (Elixir v1.16.2)

一个启动相同子进程的多个分区的监管程序。

某些进程在大型系统中可能会成为瓶颈。如果这些进程可以轻松地对其状态进行分区,并且它们之间没有依赖关系,那么它们可以使用 PartitionSupervisor 创建多个隔离和独立的分区。

启动 PartitionSupervisor 后,您可以使用 {:via, PartitionSupervisor, {name, key}} 向其子进程发送消息,其中 namePartitionSupervisor 的名称,key 用于路由。

该模块在 Elixir v1.14.0 中引入。

简单示例

让我们从一个本身并不实用的示例开始,但它展示了如何启动分区以及如何将消息路由到它们。

这是一个简单的 GenServer,它只是收集它接收到的消息。它会打印消息以方便说明。

defmodule Collector do
  use GenServer

  def start_link(args) do
    GenServer.start_link(__MODULE__, args)
  end

  def init(args) do
    IO.inspect([__MODULE__, " got args ", args, " in ", self()])
    {:ok, _initial_state = []}
  end

  def collect(server, msg) do
    GenServer.call(server, {:collect, msg})
  end

  def handle_call({:collect, msg}, _from, state) do
    new_state = [msg | state]
    IO.inspect(["current messages:", new_state, " in process", self()])
    {:reply, :ok, new_state}
  end
end

要运行多个此类进程,我们可以在我们的监管树中将它们启动在一个 PartitionSupervisor 下。

{PartitionSupervisor,
  child_spec: Collector.child_spec([some: :arg]),
  name: MyApp.PartitionSupervisor
}

我们可以使用“via 元组”向它们发送消息。

# The key is used to route our message to a particular instance.
key = 1
Collector.collect({:via, PartitionSupervisor, {MyApp.PartitionSupervisor, key}}, :hi)
# ["current messages:", [:hi], " in process", #PID<0.602.0>]
:ok
Collector.collect({:via, PartitionSupervisor, {MyApp.PartitionSupervisor, key}}, :ho)
# ["current messages:", [:ho, :hi], " in process", #PID<0.602.0>]
:ok

# With a different key, the message will be routed to a different instance.
key = 2
Collector.collect({:via, PartitionSupervisor, {MyApp.PartitionSupervisor, key}}, :a)
# ["current messages:", [:a], " in process", #PID<0.603.0>]
:ok
Collector.collect({:via, PartitionSupervisor, {MyApp.PartitionSupervisor, key}}, :b)
# ["current messages:", [:b, :a], " in process", #PID<0.603.0>]
:ok

现在让我们继续一个有用的示例。

DynamicSupervisor 示例

DynamicSupervisor 是一个负责启动其他进程的单个进程。在某些应用程序中,DynamicSupervisor 可能会成为瓶颈。为了解决这个问题,您可以通过 PartitionSupervisor 启动 DynamicSupervisor 的多个实例,然后选择一个“随机”实例来启动子进程。

不要启动单个 DynamicSupervisor

children = [
  {DynamicSupervisor, name: MyApp.DynamicSupervisor}
]

Supervisor.start_link(children, strategy: :one_for_one)

并直接在该动态监管程序上启动子进程

DynamicSupervisor.start_child(MyApp.DynamicSupervisor, {Agent, fn -> %{} end})

您可以将动态监管程序启动在 PartitionSupervisor 下。

children = [
  {PartitionSupervisor,
   child_spec: DynamicSupervisor,
   name: MyApp.DynamicSupervisors}
]

Supervisor.start_link(children, strategy: :one_for_one)

然后

DynamicSupervisor.start_child(
  {:via, PartitionSupervisor, {MyApp.DynamicSupervisors, self()}},
  {Agent, fn -> %{} end}
)

在上面的代码中,我们启动了一个分区监管程序,它默认情况下将为您的机器中的每个核心启动一个动态监管程序。然后,您不是按名称调用 DynamicSupervisor,而是使用 {:via, PartitionSupervisor, {name, key}} 格式通过分区监管程序调用它。我们选择了 self() 作为路由键,这意味着每个进程将被分配一个现有的动态监管程序。请参阅 start_link/1 以查看 PartitionSupervisor 支持的所有选项。

实现说明

PartitionSupervisor 使用 ETS 表或 Registry 来管理所有分区。在内部,PartitionSupervisor 为每个分区生成一个子进程规范,然后充当常规监管程序。每个子进程规范的 ID 都是分区号。

对于路由,使用两种策略。如果 key 是一个整数,则使用 rem(abs(key), partitions) 进行路由,其中 partitions 是分区数。否则,它使用 :erlang.phash2(key, partitions)。特定的路由将来可能会更改,因此不能依赖它。如果您想检索某个键的特定 PID,可以使用 GenServer.whereis({:via, PartitionSupervisor, {name, key}})

总结

函数

返回包含监管程序计数值的映射。

返回分区监管程序的分区数。

使用给定的选项启动分区监管程序。

使用给定的 reason 同步停止给定的分区监管程序。

返回一个包含所有子进程信息的列表。

类型

@type name() :: atom() | {:via, module(), term()}

PartitionSupervisor 的名称。

函数

链接到此函数

count_children(supervisor)

查看源代码 (自 1.14.0 起)
@spec count_children(name()) :: %{
  specs: non_neg_integer(),
  active: non_neg_integer(),
  supervisors: non_neg_integer(),
  workers: non_neg_integer()
}

返回包含监管程序计数值的映射。

该映射包含以下键

  • :specs - 分区(子进程)数量

  • :active - 此监管程序管理的所有正在运行的子进程的计数

  • :supervisors - 所有监管程序的计数,无论子进程是否仍然存活

  • :workers - 所有工作进程的计数,无论子进程是否仍然存活

链接到此函数

partitions(name)

查看源代码 (自 1.14.0 起)
@spec partitions(name()) :: pos_integer()

返回分区监管程序的分区数。

链接到此函数

start_link(opts)

查看源代码 (自 1.14.0 起)
@spec start_link(keyword()) :: Supervisor.on_start()

使用给定的选项启动分区监管程序。

此函数通常不会直接调用,而是当使用 PartitionSupervisor 作为另一个监管程序的子进程时调用

children = [
  {PartitionSupervisor, child_spec: SomeChild, name: MyPartitionSupervisor}
]

如果监管程序成功生成,此函数返回 {:ok, pid},其中 pid 是监管程序的 PID。如果分区监管程序的给定名称已分配给一个进程,则该函数返回 {:error, {:already_started, pid}},其中 pid 是该进程的 PID。

请注意,使用此函数启动的监管程序与父进程链接,并且不仅在崩溃时退出,而且在父进程以 :normal 原因退出时也会退出。

选项

  • :name - 一个原子或 via 元组,表示分区监管程序的名称(参见 name/0)。

  • :child_spec - 启动分区时要使用的子进程规范。

  • :partitions - 一个正整数,表示分区数。默认为 System.schedulers_online()(通常是核心数)。

  • :strategy - 重启策略选项,默认为 :one_for_one。您可以在 Supervisor 模块文档中了解有关策略的更多信息。

  • :max_restarts - 一段时间内允许的最大重启次数。默认为 3

  • :max_seconds - :max_restarts 适用的时间范围。默认为 5

  • :with_arguments - 一个有两个参数的匿名函数,允许将分区传递给子进程启动函数。请参见下面的 :with_arguments 部分。

:with_arguments

有时您希望每个分区都知道它们分配的分区号。这可以通过 :with_arguments 选项来完成。此函数接收 :child_spec 选项的值和一个整数作为分区号。它必须返回一个新的参数列表,该列表将用于启动分区进程。

例如,大多数进程通过调用 start_link(opts) 启动,其中 opts 是一个关键字列表。您可以将分区注入到传递给子进程的选项中。

with_arguments: fn [opts], partition ->
  [Keyword.put(opts, :partition, partition)]
end
链接到此函数

stop(supervisor, reason \\ :normal, timeout \\ :infinity)

查看源代码 (自 1.14.0 起)
@spec stop(name(), reason :: term(), timeout()) :: :ok

使用给定的 reason 同步停止给定的分区监管程序。

如果监管程序以给定的原因终止,它返回 :ok。如果它以其他原因终止,则调用退出。

此函数保留 OTP 关于错误报告的语义。如果原因不是 :normal:shutdown{:shutdown, _},则会记录错误报告。

链接到此函数

which_children(name)

查看源代码 (自 1.14.0 起)
@spec which_children(name()) :: [
  {:undefined, pid() | :restarting, :worker | :supervisor,
   [module()] | :dynamic}
]

返回一个包含所有子进程信息的列表。

此函数返回一个包含以下元组的列表

  • id - 分区号

  • child - 对应子进程的 PID,如果该进程即将重启,则为原子 :restarting

  • type - :worker:supervisor,如子进程规范中定义

  • modules - 如子进程规范中定义