查看源代码 Phoenix.Presence 行为 (Phoenix v1.7.14)

为进程和通道提供在线状态跟踪。

此行为提供在线状态功能,例如获取给定主题的在线状态,以及处理实时发生的加入和离开事件的差异。使用此模块定义一个主管和一个模块,该模块实现了 Phoenix.Tracker 行为,该行为使用 Phoenix.PubSub 来广播在线状态更新。

如果您只想使用 Phoenix.Presence 提供的功能的一部分,例如跟踪进程但不广播更新,建议您查看 phoenix_pubsub 项目中的 Phoenix.Tracker 功能。

示例用法

首先在您的应用程序中定义一个使用 Phoenix.Presence 的在线状态模块,并提供包含您的配置的 :otp_app 以及 :pubsub_server

defmodule MyAppWeb.Presence do
  use Phoenix.Presence,
    otp_app: :my_app,
    pubsub_server: MyApp.PubSub
end

:pubsub_server 必须指向应用程序中运行的现有发布/订阅服务器,对于新应用程序,默认情况下包含为 MyApp.PubSub

接下来,在 lib/my_app/application.ex 中将新的主管添加到您的监督树中。它必须位于 PubSub 子项之后,端点之前。

children = [
  ...
  {Phoenix.PubSub, name: MyApp.PubSub},
  MyAppWeb.Presence,
  MyAppWeb.Endpoint
]

添加后,您可以在加入通道后跟踪在线状态。

defmodule MyAppWeb.MyChannel do
  use MyAppWeb, :channel
  alias MyAppWeb.Presence

  def join("some:topic", _params, socket) do
    send(self(), :after_join)
    {:ok, assign(socket, :user_id, ...)}
  end

  def handle_info(:after_join, socket) do
    {:ok, _} = Presence.track(socket, socket.assigns.user_id, %{
      online_at: inspect(System.system_time(:second))
    })

    push(socket, "presence_state", Presence.list(socket))
    {:noreply, socket}
  end
end

在上面的示例中,Presence.track 用于将此通道的进程注册为套接字用户 ID 的在线状态,以及一个元数据映射。接下来,套接字主题的当前在线状态信息作为 "presence_state" 事件推送到客户端。

最后,在线状态加入和离开事件的差异将作为它们实时发生的 "presence_diff" 事件发送到客户端。差异结构将是一个包含 :joins:leaves 的映射,形式如下

%{
  joins: %{"123" => %{metas: [%{status: "away", phx_ref: ...}]}},
  leaves: %{"456" => %{metas: [%{status: "online", phx_ref: ...}]}}
},

有关在线状态数据结构的更多信息,请参见 list/1

获取在线状态信息

在线状态元数据应尽量减少,并用于存储小型、短暂的状态,例如用户的 "在线" 或 "离线" 状态。更详细的信息(例如需要从数据库中获取的用户详细信息)可以通过覆盖 fetch/2 函数来实现。

当使用 list/1 以及每次更新时,都会触发 fetch/2 回调,它充当在将信息广播给所有频道订阅者之前一次获取在线状态信息的机制。这可以防止 N 查询问题,并为您提供一个集中位置来分组隔离的数据获取以扩展在线状态元数据。

该函数必须返回一个与概述的在线状态数据结构匹配的数据映射,包括 :metas 键,但可以扩展信息映射以包含任何其他信息。例如

def fetch(_topic, presences) do
  users = presences |> Map.keys() |> Accounts.get_users_map()

  for {key, %{metas: metas}} <- presences, into: %{} do
    {key, %{metas: metas, user: users[String.to_integer(key)]}}
  end
end

其中 Account.get_users_map/1 可以实现为

def get_users_map(ids) do
  query =
    from u in User,
      where: u.id in ^ids,
      select: {u.id, u}

  query |> Repo.all() |> Enum.into(%{})
end

上面的 fetch/2 函数从数据库中获取所有为给定主题注册了在线状态的用户。然后使用用户的 :user 键扩展在线状态信息,同时保留原始在线状态数据中必需的 :metas 字段。

使用 Elixir 作为在线状态客户端

在线状态非常适合外部客户端,例如 JavaScript 应用程序,但也可以从 Elixir 客户端进程中使用它来跟踪服务器上发生的在线状态更改。这可以通过在您的在线状态模块上实现可选的 init/1handle_metas/4 回调来完成。例如,以下回调接收在线状态元数据更改,并向其他 Elixir 进程广播有关用户加入和离开的信息

defmodule MyApp.Presence do
  use Phoenix.Presence,
    otp_app: :my_app,
    pubsub_server: MyApp.PubSub

  def init(_opts) do
    {:ok, %{}} # user-land state
  end

  def handle_metas(topic, %{joins: joins, leaves: leaves}, presences, state) do
    # fetch existing presence information for the joined users and broadcast the
    # event to all subscribers
    for {user_id, presence} <- joins do
      user_data = %{user: presence.user, metas: Map.fetch!(presences, user_id)}
      msg = {MyApp.PresenceClient, {:join, user_data}}
      Phoenix.PubSub.local_broadcast(MyApp.PubSub, topic, msg)
    end

    # fetch existing presence information for the left users and broadcast the
    # event to all subscribers
    for {user_id, presence} <- leaves do
      metas =
        case Map.fetch(presences, user_id) do
          {:ok, presence_metas} -> presence_metas
          :error -> []
        end

      user_data = %{user: presence.user, metas: metas}
      msg = {MyApp.PresenceClient, {:leave, user_data}}
      Phoenix.PubSub.local_broadcast(MyApp.PubSub, topic, msg)
    end

    {:ok, state}
  end
end

handle_metas/4 回调接收主题、在线状态差异、主题的当前在线状态及其元数据,以及从 init 和后续 handle_metas/4 调用中积累的任何用户端状态。在我们的示例实现中,我们遍历差异中的 :joins:leaves,并从我们已知的在线状态信息中填充完整的在线状态。然后,我们向本地节点订阅者广播有关用户加入和离开的信息。

使用在线状态进行测试

每次调用 fetch 回调时,它都是从一个单独的进程完成的。鉴于这些进程异步运行,通常需要确保它们在每个测试结束时都已关闭。这可以通过使用 ExUnit 的 on_exit 钩子加上 fetchers_pids 函数来完成

on_exit(fn ->
  for pid <- MyAppWeb.Presence.fetchers_pids() do
    ref = Process.monitor(pid)
    assert_receive {:DOWN, ^ref, _, _, _}, 1000
  end
end)

总结

回调

使用其他数据扩展在线状态信息。

返回套接字/主题-键对的在线状态元数据映射。

接收在线状态元数据更改。

初始化在线状态客户端状态。

返回套接字/主题的在线状态。

将通道的进程跟踪为在线状态。

将任意进程跟踪为在线状态。

停止跟踪通道的进程。

停止跟踪进程。

更新通道在线状态的元数据。

更新进程在线状态的元数据。

类型

@type presence() :: %{key: String.t(), meta: map()}
@type presences() :: %{required(String.t()) => %{metas: [map()]}}
@type topic() :: String.t()

回调

链接到此回调

fetch(topic, presences)

查看源代码
@callback fetch(topic(), presences()) :: presences()

使用其他数据扩展在线状态信息。

list/1 用于列出给定 topic 的所有在线状态时,此回调将触发一次以修改结果,然后将其广播给所有频道订阅者。这可以避免 N 查询问题,并提供一个集中位置来扩展在线状态元数据。您必须返回一个与原始结果匹配的数据映射,包括 :metas 键,但可以扩展映射以包含任何其他信息。

默认实现只是将 presences 不变地传递。

示例

def fetch(_topic, presences) do
  query =
    from u in User,
      where: u.id in ^Map.keys(presences),
      select: {u.id, u}

  users = query |> Repo.all() |> Enum.into(%{})
  for {key, %{metas: metas}} <- presences, into: %{} do
    {key, %{metas: metas, user: users[key]}}
  end
end
@callback get_by_key(Phoenix.Socket.t() | topic(), key :: String.t()) :: [presence()]

返回套接字/主题-键对的在线状态元数据映射。

示例

使用与 list/1 中每个在线状态相同的数据格式,但仅返回主题和键对下在线状态的元数据。例如,具有键 "user1" 的用户从两个设备连接到同一个聊天室 "room:1",可以返回

iex> MyPresence.get_by_key("room:1", "user1")
[%{name: "User 1", metas: [%{device: "Desktop"}, %{device: "Mobile"}]}]

list/1 一样,在线状态元数据将传递到在线状态模块的 fetch 回调中,以获取任何其他信息。

链接到此回调

handle_metas(topic, diff, presences, state)

查看源代码 (可选)
@callback handle_metas(
  topic :: String.t(),
  diff :: map(),
  presences :: map(),
  state :: term()
) ::
  {:ok, term()}

接收在线状态元数据更改。

@callback init(state :: term()) :: {:ok, new_state :: term()}

初始化在线状态客户端状态。

在线状态模块启动时调用,允许动态提供初始状态以处理在线状态元数据。

@callback list(socket_or_topic :: Phoenix.Socket.t() | topic()) :: presences()

返回套接字/主题的在线状态。

在线状态数据结构

在线状态信息作为映射返回,其中在线状态按键分组,转换为字符串,并累积元数据,其形式如下

%{key => %{metas: [%{phx_ref: ..., ...}, ...]}}

例如,假设一个 ID 为 123 的用户从两个不同的设备上线,以及一个 ID 为 456 的用户从一个设备上线。可能会返回以下在线状态信息

%{"123" => %{metas: [%{status: "away", phx_ref: ...},
                     %{status: "online", phx_ref: ...}]},
  "456" => %{metas: [%{status: "online", phx_ref: ...}]}}

映射的键通常指向资源 ID。该值将包含一个具有 :metas 键的映射,该映射包含每个资源的元数据列表。此外,每个元数据条目都将包含一个 :phx_ref 键,该键可用于唯一标识给定键的元数据。如果元数据之前已更新,则将存在一个 :phx_ref_prev 键,其中包含先前的 :phx_ref 值。

链接到此回调

track(socket, key, meta)

查看源代码
@callback track(socket :: Phoenix.Socket.t(), key :: String.t(), meta :: map()) ::
  {:ok, ref :: binary()} | {:error, reason :: term()}

将通道的进程跟踪为在线状态。

跟踪的在线状态按 key 分组,转换为字符串。例如,要将每个用户的频道分组在一起,请使用用户 ID 作为键。每个在线状态都可以与一个元数据映射相关联,以存储小型、短暂的状态,例如用户的在线状态。要存储详细信息,请参见 fetch/2

示例

alias MyApp.Presence
def handle_info(:after_join, socket) do
  {:ok, _} = Presence.track(socket, socket.assigns.user_id, %{
    online_at: inspect(System.system_time(:second))
  })
  {:noreply, socket}
end
链接到此回调

track(pid, topic, key, meta)

查看源代码
@callback track(pid(), topic(), key :: String.t(), meta :: map()) ::
  {:ok, ref :: binary()} | {:error, reason :: term()}

将任意进程跟踪为在线状态。

track/3 相同,但按 topickey 跟踪任何进程。

@callback untrack(socket :: Phoenix.Socket.t(), key :: String.t()) :: :ok

停止跟踪通道的进程。

链接到此回调

untrack(pid, topic, key)

查看源代码
@callback untrack(pid(), topic(), key :: String.t()) :: :ok

停止跟踪进程。

链接到此回调

update(socket, key, meta)

查看源代码
@callback update(
  socket :: Phoenix.Socket.t(),
  key :: String.t(),
  meta :: map() | (map() -> map())
) :: {:ok, ref :: binary()} | {:error, reason :: term()}

更新通道在线状态的元数据。

通过传递一个新的映射或一个函数来替换在线状态的元数据,该函数接受当前映射并返回一个新的映射。

链接到此回调

update(pid, topic, key, meta)

查看源代码
@callback update(pid(), topic(), key :: String.t(), meta :: map() | (map() -> map())) ::
  {:ok, ref :: binary()} | {:error, reason :: term()}

更新进程在线状态的元数据。

update/3 相同,但使用任意进程。