查看源代码 Phoenix.Tracker 行为 (Phoenix.PubSub v2.1.3)

为进程提供分布式状态跟踪。

Tracker 分片使用心跳协议和 CRDT 在集群中以最终一致的方式复制状态信息。在这个设计下,没有单一的事实来源或全局进程。每个节点都运行一组跟踪器,节点本地更改会在集群中复制,并在本地作为更改差异进行处理。

实现跟踪器

实现一个跟踪器

要启动一个跟踪器,首先将它添加到您的监督树中

children = [
  # ...
  {MyTracker, [name: MyTracker, pubsub_server: MyApp.PubSub]}
]

接下来,使用对 Phoenix.Tracker 行为回调的支持来实现 MyTracker。一个最小跟踪器的示例可能包括

defmodule MyTracker do
  use Phoenix.Tracker

  def start_link(opts) do
    opts = Keyword.merge([name: __MODULE__], opts)
    Phoenix.Tracker.start_link(__MODULE__, opts, opts)
  end

  def init(opts) do
    server = Keyword.fetch!(opts, :pubsub_server)
    {:ok, %{pubsub_server: server, node_name: Phoenix.PubSub.node_name(server)}}
  end

  def handle_diff(diff, state) do
    for {topic, {joins, leaves}} <- diff do
      for {key, meta} <- joins do
        IO.puts "presence join: key \"#{key}\" with meta #{inspect meta}"
        msg = {:join, key, meta}
        Phoenix.PubSub.direct_broadcast!(state.node_name, state.pubsub_server, topic, msg)
      end
      for {key, meta} <- leaves do
        IO.puts "presence leave: key \"#{key}\" with meta #{inspect meta}"
        msg = {:leave, key, meta}
        Phoenix.PubSub.direct_broadcast!(state.node_name, state.pubsub_server, topic, msg)
      end
    end
    {:ok, state}
  end
end

跟踪器必须实现 start_link/1init/1handle_diff/2init/1 回调允许跟踪器在 Phoenix.Tracker 服务器中运行时管理其自身状态。 handle_diff 回调将被调用,并带有一个由主题分组的出现加入和离开事件的差异。当副本心跳和复制数据时,本地跟踪器状态会与远程数据合并,并且差异会被发送到回调。处理程序可以使用此信息来通知订阅者有关事件的信息,如上所述。

一个可选的 handle_info/2 回调也可以被调用来处理跟踪器中的特定于应用程序的消息。

特殊注意事项

特殊注意事项

handle_diff/2 中的操作发生在 _跟踪器服务器的上下文中_。因此,应尽可能避免阻塞操作,并在需要时将其卸载到一个受监督的任务中。此外, handle_diff/2 中的崩溃会导致跟踪器服务器崩溃,因此可能导致服务器崩溃的操作应该使用 Task.Supervisor 生成的进程来卸载。

链接到本节 摘要

函数

返回在监督程序下启动此模块的规范。

获取在给定主题和键对下跟踪的状态。

通过向所有副本广播 permdown 来优雅地关闭。

列出在给定主题下跟踪的所有状态。

启动一个跟踪器池。

更新状态的元数据。

链接到本节 类型

@type presence() :: {key :: String.t(), meta :: map()}
@type topic() :: String.t()

链接到本节 回调

链接到此回调

handle_diff(map, state)

查看源代码
@callback handle_diff(
  %{required(topic()) => {joins :: [presence()], leaves :: [presence()]}},
  state :: term()
) :: {:ok, state :: term()}
链接到此回调

handle_info(message, state)

查看源代码 (可选)
@callback handle_info(message :: term(), state :: term()) :: {:noreply, state :: term()}
@callback init(Keyword.t()) :: {:ok, state :: term()} | {:error, reason :: term()}

链接到本节 函数

返回在监督程序下启动此模块的规范。

Supervisor.

链接到此函数

get_by_key(tracker_name, topic, key)

查看源代码
@spec get_by_key(atom(), topic(), term()) :: [presence()]

获取在给定主题和键对下跟踪的状态。

  • server_name - 跟踪器服务器的注册名称
  • topic - Phoenix.PubSub 主题
  • key - 状态的键

返回状态元数据的列表。

示例

示例

iex> Phoenix.Tracker.get_by_key(MyTracker, "lobby", "user1")
[{#PID<0.88.0>, %{name: "User 1"}, {#PID<0.89.0>, %{name: "User 1"}]
链接到此函数

graceful_permdown(tracker_name)

查看源代码
@spec graceful_permdown(atom()) :: :ok

通过向所有副本广播 permdown 来优雅地关闭。

示例

示例

iex> Phoenix.Tracker.graceful_permdown(MyTracker)
:ok
链接到此函数

list(tracker_name, topic)

查看源代码
@spec list(atom(), topic()) :: [presence()]

列出在给定主题下跟踪的所有状态。

  • server_name - 跟踪器服务器的注册名称
  • topic - Phoenix.PubSub 主题

返回键/元数据元组对的列表。

示例

示例

iex> Phoenix.Tracker.list(MyTracker, "lobby")
[{123, %{name: "user 123"}}, {456, %{name: "user 456"}}]
链接到此函数

start_link(tracker, tracker_arg, pool_opts)

查看源代码

启动一个跟踪器池。

  • tracker - 实现 Phoenix.Tracker 行为的跟踪器模块
  • tracker_arg - 传递给跟踪器处理程序 init/1 的参数
  • pool_opts - 用于构建分片池的选项列表

必需的 pool_opts

必需的 pool_opts

  • :name - 服务器的名称,例如: MyApp.Tracker 这也将形成所有分片名称的通用前缀
  • :pubsub_server - PubSub 服务器的名称,例如: MyApp.PubSub

可选的 pool_opts

可选的 pool_opts

  • :broadcast_period - 在集群中发送增量广播的时间间隔(以毫秒为单位)。默认值 1500
  • :max_silent_periods - 没有发送增量广播的广播周期数的最大整数。默认值 10(15 秒心跳)
  • :down_period - 将副本标记为暂时关闭的时间间隔(以毫秒为单位)。默认值 broadcast_period * max_silent_periods * 2(30 秒关闭检测)。注意:这必须至少是 broadcast_period 的 2 倍。
  • :permdown_period - 将副本标记为永久关闭并丢弃其状态的时间间隔(以毫秒为单位)。注意:这必须至少大于 down_period。默认值 1_200_000(20 分钟)
  • :clock_sample_periods - 在折叠和请求传输之前对远程时钟进行采样的心跳窗口数量。默认值 2
  • :max_delta_sizes - 在回退到发送整个状态之前保留的增量生成大小列表。默认为 [100, 1000, 10_000]
  • :log_level - 用于记录事件的日志级别,默认为 :debug,可以用 false 禁用
  • :pool_size - 要启动的跟踪器分片数量。默认值 1
链接到此函数

track(tracker_name, pid, topic, key, meta)

查看源代码
@spec track(atom(), pid(), topic(), term(), map()) ::
  {:ok, ref :: binary()} | {:error, reason :: term()}

跟踪状态。

  • server_name - 跟踪器服务器的注册名称
  • pid - 要跟踪的 Pid
  • topic - 此状态的 Phoenix.PubSub 主题
  • key - 标识此状态的键
  • meta - 要附加到此状态的元数据的映射

只要主题和键对对于给定进程的任何先前调用都是唯一的,进程就可以被多次跟踪。

示例

示例

iex> Phoenix.Tracker.track(MyTracker, self(), "lobby", u.id, %{stat: "away"})
{:ok, "1WpAofWYIAA="}

iex> Phoenix.Tracker.track(MyTracker, self(), "lobby", u.id, %{stat: "away"})
{:error, {:already_tracked, #PID<0.56.0>, "lobby", "123"}}
链接到此函数

untrack(tracker_name, pid)

查看源代码
链接到此函数

untrack(tracker_name, pid, topic, key)

查看源代码
@spec untrack(atom(), pid(), topic(), term()) :: :ok

取消跟踪状态。

  • server_name - 跟踪器服务器的注册名称
  • pid - 要取消跟踪的 Pid
  • topic - 要取消跟踪此状态的 Phoenix.PubSub 主题
  • key - 标识此状态的键

通过调用此函数的 Phoenix.Tracker.untrack/2 签名,可以取消跟踪给定 Pid 的所有状态。

示例

示例

iex> Phoenix.Tracker.untrack(MyTracker, self(), "lobby", u.id)
:ok
iex> Phoenix.Tracker.untrack(MyTracker, self())
:ok
链接到此函数

update(tracker_name, pid, topic, key, meta)

查看源代码
@spec update(atom(), pid(), topic(), term(), map() | (map() -> map())) ::
  {:ok, ref :: binary()} | {:error, reason :: term()}

更新状态的元数据。

  • server_name - 跟踪器服务器的注册名称
  • pid - 正在跟踪的 Pid
  • topic - 要更新此状态的 Phoenix.PubSub 主题
  • key - 标识此状态的键
  • meta - 要附加到此状态的新元数据映射,或者是一个函数。该函数将接收当前元数据作为输入,并且返回值将用作新的元数据

示例

示例

iex> Phoenix.Tracker.update(MyTracker, self(), "lobby", u.id, %{stat: "zzz"})
{:ok, "1WpAofWYIAA="}

iex> Phoenix.Tracker.update(MyTracker, self(), "lobby", u.id, fn meta -> Map.put(meta, :away, true) end)
{:ok, "1WpAofWYIAA="}