查看源代码 Phoenix.Tracker 行为 (Phoenix.PubSub v2.1.3)
为进程提供分布式状态跟踪。
Tracker 分片使用心跳协议和 CRDT 在集群中以最终一致的方式复制状态信息。在这个设计下,没有单一的事实来源或全局进程。每个节点都运行一组跟踪器,节点本地更改会在集群中复制,并在本地作为更改差异进行处理。
实现跟踪器
实现一个跟踪器
要启动一个跟踪器,首先将它添加到您的监督树中
children = [
# ...
{MyTracker, [name: MyTracker, pubsub_server: MyApp.PubSub]}
]
接下来,使用对 Phoenix.Tracker
行为回调的支持来实现 MyTracker
。一个最小跟踪器的示例可能包括
defmodule MyTracker do
use Phoenix.Tracker
def start_link(opts) do
opts = Keyword.merge([name: __MODULE__], opts)
Phoenix.Tracker.start_link(__MODULE__, opts, opts)
end
def init(opts) do
server = Keyword.fetch!(opts, :pubsub_server)
{:ok, %{pubsub_server: server, node_name: Phoenix.PubSub.node_name(server)}}
end
def handle_diff(diff, state) do
for {topic, {joins, leaves}} <- diff do
for {key, meta} <- joins do
IO.puts "presence join: key \"#{key}\" with meta #{inspect meta}"
msg = {:join, key, meta}
Phoenix.PubSub.direct_broadcast!(state.node_name, state.pubsub_server, topic, msg)
end
for {key, meta} <- leaves do
IO.puts "presence leave: key \"#{key}\" with meta #{inspect meta}"
msg = {:leave, key, meta}
Phoenix.PubSub.direct_broadcast!(state.node_name, state.pubsub_server, topic, msg)
end
end
{:ok, state}
end
end
跟踪器必须实现 start_link/1
、init/1
和 handle_diff/2
。 init/1
回调允许跟踪器在 Phoenix.Tracker
服务器中运行时管理其自身状态。 handle_diff
回调将被调用,并带有一个由主题分组的出现加入和离开事件的差异。当副本心跳和复制数据时,本地跟踪器状态会与远程数据合并,并且差异会被发送到回调。处理程序可以使用此信息来通知订阅者有关事件的信息,如上所述。
一个可选的 handle_info/2
回调也可以被调用来处理跟踪器中的特定于应用程序的消息。
特殊注意事项
特殊注意事项
handle_diff/2
中的操作发生在 _跟踪器服务器的上下文中_。因此,应尽可能避免阻塞操作,并在需要时将其卸载到一个受监督的任务中。此外, handle_diff/2
中的崩溃会导致跟踪器服务器崩溃,因此可能导致服务器崩溃的操作应该使用 Task.Supervisor
生成的进程来卸载。
链接到本节 摘要
链接到本节 类型
链接到本节 回调
链接到本节 函数
返回在监督程序下启动此模块的规范。
见 Supervisor
.
获取在给定主题和键对下跟踪的状态。
server_name
- 跟踪器服务器的注册名称topic
-Phoenix.PubSub
主题key
- 状态的键
返回状态元数据的列表。
示例
示例
iex> Phoenix.Tracker.get_by_key(MyTracker, "lobby", "user1")
[{#PID<0.88.0>, %{name: "User 1"}, {#PID<0.89.0>, %{name: "User 1"}]
@spec graceful_permdown(atom()) :: :ok
通过向所有副本广播 permdown 来优雅地关闭。
示例
示例
iex> Phoenix.Tracker.graceful_permdown(MyTracker)
:ok
列出在给定主题下跟踪的所有状态。
server_name
- 跟踪器服务器的注册名称topic
-Phoenix.PubSub
主题
返回键/元数据元组对的列表。
示例
示例
iex> Phoenix.Tracker.list(MyTracker, "lobby")
[{123, %{name: "user 123"}}, {456, %{name: "user 456"}}]
启动一个跟踪器池。
tracker
- 实现Phoenix.Tracker
行为的跟踪器模块tracker_arg
- 传递给跟踪器处理程序init/1
的参数pool_opts
- 用于构建分片池的选项列表
必需的 pool_opts
必需的 pool_opts
:name
- 服务器的名称,例如:MyApp.Tracker
这也将形成所有分片名称的通用前缀:pubsub_server
- PubSub 服务器的名称,例如:MyApp.PubSub
可选的 pool_opts
可选的 pool_opts
:broadcast_period
- 在集群中发送增量广播的时间间隔(以毫秒为单位)。默认值1500
:max_silent_periods
- 没有发送增量广播的广播周期数的最大整数。默认值10
(15 秒心跳):down_period
- 将副本标记为暂时关闭的时间间隔(以毫秒为单位)。默认值broadcast_period * max_silent_periods * 2
(30 秒关闭检测)。注意:这必须至少是broadcast_period
的 2 倍。:permdown_period
- 将副本标记为永久关闭并丢弃其状态的时间间隔(以毫秒为单位)。注意:这必须至少大于down_period
。默认值1_200_000
(20 分钟):clock_sample_periods
- 在折叠和请求传输之前对远程时钟进行采样的心跳窗口数量。默认值2
:max_delta_sizes
- 在回退到发送整个状态之前保留的增量生成大小列表。默认为[100, 1000, 10_000]
。:log_level
- 用于记录事件的日志级别,默认为:debug
,可以用false
禁用:pool_size
- 要启动的跟踪器分片数量。默认值1
@spec track(atom(), pid(), topic(), term(), map()) :: {:ok, ref :: binary()} | {:error, reason :: term()}
跟踪状态。
server_name
- 跟踪器服务器的注册名称pid
- 要跟踪的 Pidtopic
- 此状态的Phoenix.PubSub
主题key
- 标识此状态的键meta
- 要附加到此状态的元数据的映射
只要主题和键对对于给定进程的任何先前调用都是唯一的,进程就可以被多次跟踪。
示例
示例
iex> Phoenix.Tracker.track(MyTracker, self(), "lobby", u.id, %{stat: "away"})
{:ok, "1WpAofWYIAA="}
iex> Phoenix.Tracker.track(MyTracker, self(), "lobby", u.id, %{stat: "away"})
{:error, {:already_tracked, #PID<0.56.0>, "lobby", "123"}}
取消跟踪状态。
server_name
- 跟踪器服务器的注册名称pid
- 要取消跟踪的 Pidtopic
- 要取消跟踪此状态的Phoenix.PubSub
主题key
- 标识此状态的键
通过调用此函数的 Phoenix.Tracker.untrack/2
签名,可以取消跟踪给定 Pid 的所有状态。
示例
示例
iex> Phoenix.Tracker.untrack(MyTracker, self(), "lobby", u.id)
:ok
iex> Phoenix.Tracker.untrack(MyTracker, self())
:ok
@spec update(atom(), pid(), topic(), term(), map() | (map() -> map())) :: {:ok, ref :: binary()} | {:error, reason :: term()}
更新状态的元数据。
server_name
- 跟踪器服务器的注册名称pid
- 正在跟踪的 Pidtopic
- 要更新此状态的Phoenix.PubSub
主题key
- 标识此状态的键meta
- 要附加到此状态的新元数据映射,或者是一个函数。该函数将接收当前元数据作为输入,并且返回值将用作新的元数据
示例
示例
iex> Phoenix.Tracker.update(MyTracker, self(), "lobby", u.id, %{stat: "zzz"})
{:ok, "1WpAofWYIAA="}
iex> Phoenix.Tracker.update(MyTracker, self(), "lobby", u.id, fn meta -> Map.put(meta, :away, true) end)
{:ok, "1WpAofWYIAA="}