查看源代码 Phoenix.Channel 行为 (Phoenix v1.7.14)

定义一个 Phoenix Channel。

Channel 提供了一种机制,可以让客户端进行双向通信,并与 Phoenix.PubSub 层集成,实现近实时功能。

有关概念概述,请参阅 Channels 指南

主题 & 回调

每次加入 Channel 时,都需要选择要监听的特定主题。主题只是一个标识符,但按照惯例,它通常由两个部分组成:"topic:subtopic"。使用 "topic:subtopic" 方法与 Phoenix.Socket.channel/3 配合良好,允许通过使用通配符(* 字符)作为主题模式中的最后一个字符来匹配以给定前缀开头的所有主题

channel "room:*", MyAppWeb.RoomChannel

在上面的示例中,任何进入路由器并带有 "room:" 前缀的主题都将分派到 MyAppWeb.RoomChannel。主题也可以在 Channel 的 join/3 回调中进行模式匹配,以提取范围内的模式

# handles the special `"lobby"` subtopic
def join("room:lobby", _payload, socket) do
  {:ok, socket}
end

# handles any other subtopic as the room ID, for example `"room:12"`, `"room:34"`
def join("room:" <> room_id, _payload, socket) do
  {:ok, socket}
end

授权

客户端必须加入 Channel 才能在该 Channel 上发送和接收 PubSub 事件。您的 Channel 必须实现一个 join/3 回调,以授权给定主题的 Socket。例如,您可以检查用户是否有权加入该特定房间。

要在 join/3 中授权 Socket,请返回 {:ok, socket}。要在 join/3 中拒绝授权,请返回 {:error, reply}

传入事件

客户端成功加入 Channel 后,来自客户端的传入事件将通过 Channel 的 handle_in/3 回调进行路由。在这些回调中,您可以执行任何操作。传入回调必须返回 socket 以维护短暂状态。

通常,您会使用 broadcast!/3 将消息转发给所有监听者,或直接回复客户端事件以进行请求/响应式消息传递。

通用消息有效负载作为映射接收

def handle_in("new_msg", %{"uid" => uid, "body" => body}, socket) do
  ...
  {:reply, :ok, socket}
end

二进制数据有效负载作为 {:binary, data} 元组传递

def handle_in("file_chunk", {:binary, chunk}, socket) do
  ...
  {:reply, :ok, socket}
end

广播

以下是如何从一个客户端接收传入的 "new_msg" 事件,并将消息广播给该 Socket 的所有主题订阅者的示例。

def handle_in("new_msg", %{"uid" => uid, "body" => body}, socket) do
  broadcast!(socket, "new_msg", %{uid: uid, body: body})
  {:noreply, socket}
end

回复

回复对于确认客户端的消息或以操作结果进行响应很有用。回复仅发送到连接到当前 Channel 进程的客户端。在幕后,它们包含客户端消息 ref,这使客户端能够将收到的回复与其发送的消息相关联。

例如,想象一下创建资源并用创建的记录进行回复

def handle_in("create:post", attrs, socket) do
  changeset = Post.changeset(%Post{}, attrs)

  if changeset.valid? do
    post = Repo.insert!(changeset)
    response = MyAppWeb.PostView.render("show.json", %{post: post})
    {:reply, {:ok, response}, socket}
  else
    response = MyAppWeb.ChangesetView.render("errors.json", %{changeset: changeset})
    {:reply, {:error, response}, socket}
  end
end

或者您可能只想确认操作已成功

def handle_in("create:post", attrs, socket) do
  changeset = Post.changeset(%Post{}, attrs)

  if changeset.valid? do
    Repo.insert!(changeset)
    {:reply, :ok, socket}
  else
    {:reply, :error, socket}
  end
end

二进制数据也支持通过 {:binary, data} 元组进行回复

{:reply, {:ok, {:binary, bin}}, socket}

如果您不想向客户端发送回复,您可以返回

{:noreply, socket}

您可能这样做的一种情况是,如果您需要稍后进行回复;请参阅 reply/2

推送

调用 push/3 允许您向客户端发送一条消息,该消息不是对特定客户端消息的回复。因为它不是回复,所以推送的消息不包含客户端消息 ref;没有先前的客户端消息与之相关。

可能的用例包括通知客户端

  • 您已自动保存了用户的文档
  • 用户的游戏即将结束
  • IoT 设备的设置应更新

例如,您可以在收到与他们相关的 PubSub 消息后,在 handle_info/3 中向客户端 push/3 消息。

alias Phoenix.Socket.Broadcast
def handle_info(%Broadcast{topic: _, event: event, payload: payload}, socket) do
  push(socket, event, payload)
  {:noreply, socket}
end

推送数据可以以映射或带标签的 {:binary, data} 元组的形式给出

# client asks for their current rank. reply contains it, and client
# is also pushed a leader board and a badge image
def handle_in("current_rank", _, socket) do
  push(socket, "leaders", %{leaders: Game.get_leaders(socket.assigns.game_id)})
  push(socket, "badge", {:binary, File.read!(socket.assigns.badge_path)})
  {:reply, %{val: Game.get_rank(socket.assigns[:user])}, socket}
end

请注意,在此示例中,push/3 是从 handle_in/3 中调用的;这样,您基本上可以对来自客户端的单个消息进行 N 次回复。请参阅 reply/2,了解为什么这样做可能很方便。

拦截传出事件

当使用 broadcast/3 广播事件时,每个 Channel 订阅者可以选择拦截该事件并触发其 handle_out/3 回调。这允许事件的有效负载在每个 Socket 基础上进行自定义,以附加额外信息,或有条件地过滤掉要传递的消息。如果事件未通过 Phoenix.Channel.intercept/1 拦截,则消息将直接推送到客户端

intercept ["new_msg", "user_joined"]

# for every socket subscribing to this topic, append an `is_editable`
# value for client metadata.
def handle_out("new_msg", msg, socket) do
  push(socket, "new_msg", Map.merge(msg,
    %{is_editable: User.can_edit_message?(socket.assigns[:user], msg)}
  ))
  {:noreply, socket}
end

# do not send broadcasted `"user_joined"` events if this socket's user
# is ignoring the user who joined.
def handle_out("user_joined", msg, socket) do
  unless User.ignoring?(socket.assigns[:user], msg.user_id) do
    push(socket, "user_joined", msg)
  end
  {:noreply, socket}
end

广播到外部主题

在某些情况下,您可能希望在没有 socket 上下文的情况下广播消息。这可能是为了从您的 Channel 内部广播到外部主题,或者从应用程序中的其他地方(如控制器或另一个进程)广播。可以通过您的端点执行此操作

# within channel
def handle_in("new_msg", %{"uid" => uid, "body" => body}, socket) do
  ...
  broadcast_from!(socket, "new_msg", %{uid: uid, body: body})
  MyAppWeb.Endpoint.broadcast_from!(self(), "room:superadmin",
    "new_msg", %{uid: uid, body: body})
  {:noreply, socket}
end

# within controller
def create(conn, params) do
  ...
  MyAppWeb.Endpoint.broadcast!("room:" <> rid, "new_msg", %{uid: uid, body: body})
  MyAppWeb.Endpoint.broadcast!("room:superadmin", "new_msg", %{uid: uid, body: body})
  redirect(conn, to: "/")
end

终止

终止时,将调用 Channel 回调 terminate/2,并传入错误原因和 Socket。

如果我们正在终止,因为客户端离开了,那么原因将是 {:shutdown, :left}。同样,如果我们正在终止,因为客户端连接已关闭,那么原因将是 {:shutdown, :closed}

如果任何回调返回一个 :stop 元组,它也会触发终止,并传入元组中给出的原因。

terminate/2 不会在错误的情况下或退出情况下调用。这与您在 Elixir 抽象(如 GenServer 等)中发现的行为相同。与 GenServer 类似,也可以 :trap_exit 来保证调用 terminate/2。但并不鼓励这种做法。

一般来说,如果您想清理一些东西,最好监视您的 Channel 进程并从另一个进程进行清理。所有 Channel 回调(包括 join/3)都在 Channel 进程内部调用。因此,它们中的任何一个中的 self() 都将返回要监视的 PID。

停止 Channel 时退出原因

当 Channel 回调返回一个 :stop 元组时,例如

{:stop, :shutdown, socket}
{:stop, {:error, :enoent}, socket}

第二个参数是退出原因,它遵循与标准 GenServer 退出相同的行为。

在关闭 Channel 时,您可以从以下三种选项中选择

  • :normal - 在这种情况下,退出不会被记录,并且链接的进程不会退出

  • :shutdown{:shutdown, term} - 在这种情况下,退出不会被记录,并且链接的进程会以相同的原因退出,除非它们正在捕获退出

  • 任何其他项 - 在这种情况下,退出将被记录,并且链接的进程会以相同的原因退出,除非它们正在捕获退出

订阅外部主题

有时,您可能需要以编程方式将 Socket 订阅到除了内部 socket.topic 之外的外部主题。例如,想象一下您有一个竞拍系统,其中远程客户端动态地设置他们想要接收竞拍通知的产品的偏好。与为每个偏好需要一个唯一的 Channel 进程和主题相比,一个更有效且更简单的方法是通过您的端点将单个 Channel 订阅到相关的通知。例如

defmodule MyAppWeb.Endpoint.NotificationChannel do
  use Phoenix.Channel

  def join("notification:" <> user_id, %{"ids" => ids}, socket) do
    topics = for product_id <- ids, do: "product:#{product_id}"

    {:ok, socket
          |> assign(:topics, [])
          |> put_new_topics(topics)}
  end

  def handle_in("watch", %{"product_id" => id}, socket) do
    {:reply, :ok, put_new_topics(socket, ["product:#{id}"])}
  end

  def handle_in("unwatch", %{"product_id" => id}, socket) do
    {:reply, :ok, MyAppWeb.Endpoint.unsubscribe("product:#{id}")}
  end

  defp put_new_topics(socket, topics) do
    Enum.reduce(topics, socket, fn topic, acc ->
      topics = acc.assigns.topics
      if topic in topics do
        acc
      else
        :ok = MyAppWeb.Endpoint.subscribe(topic)
        assign(acc, :topics, [topic | topics])
      end
    end)
  end
end

注意:调用者必须负责防止重复订阅。在从您的端点调用 subscribe/1 之后,相同流程适用于在您的 Channel 中处理常规 Elixir 消息。最常见的是,您只需转发 %Phoenix.Socket.Broadcast{} 事件和有效负载

alias Phoenix.Socket.Broadcast
def handle_info(%Broadcast{topic: _, event: event, payload: payload}, socket) do
  push(socket, event, payload)
  {:noreply, socket}
end

休眠

从 Erlang/OTP 20 开始,Channel 在 15_000 毫秒的空闲时间后会自动休眠以节省内存。这可以通过将 :hibernate_after 选项传递给 use Phoenix.Channel 来自定义

use Phoenix.Channel, hibernate_after: 60_000

您也可以将其设置为 :infinity 以完全禁用它。

关闭

您可以通过在使用时设置 :shutdown 值来配置应用程序关闭时使用的每个 Channel 的关闭行为

use Phoenix.Channel, shutdown: 5_000

它默认为 5_000。受支持的值在 Supervisor 模块文档中进行了描述。

记录

默认情况下,Channel "join""handle_in" 事件使用 :info:debug 级别分别进行记录。您可以更改用于每个事件的级别,或通过在使用 Phoenix.Channel 时设置 :log_join:log_handle_in 选项来禁用每个事件类型的日志。例如,以下配置将加入事件记录为 :info,但会禁用传入事件的记录

use Phoenix.Channel, log_join: :info, log_handle_in: false

请注意,更改事件类型的级别不会影响记录的内容,除非您将其设置为 false,否则它会影响关联的级别。

总结

回调

处理常规 GenServer 调用消息。

处理常规 GenServer cast 消息。

处理传入的 event

处理常规 Elixir 进程消息。

拦截传出的 event

处理通过 topic 加入的频道。

在频道进程即将退出时调用。

函数

将事件广播给套接字主题的所有订阅者。

broadcast/3 相同,但如果广播失败,则会引发异常。

从 pid 广播事件到套接字主题的所有订阅者。

broadcast_from/3 相同,但如果广播失败,则会引发异常。

定义要为 handle_out/3 回调拦截的频道事件。

将事件直接发送到已连接的客户端,无需客户端先前的消息。

异步回复套接字推送。

为异步回复生成 socket_ref

类型

@type payload() :: map() | term() | {:binary, binary()}
@type reply() :: status :: atom() | {status :: atom(), response :: payload()}
@type socket_ref() ::
  {transport_pid :: Pid, serializer :: module(), topic :: binary(),
   ref :: binary(), join_ref :: binary()}

回调

链接到此回调

code_change(old_vsn, t, extra)

查看源代码 (可选)
@callback code_change(old_vsn, Phoenix.Socket.t(), extra :: term()) ::
  {:ok, Phoenix.Socket.t()} | {:error, reason :: term()}
when old_vsn: term() | {:down, term()}
链接到此回调

handle_call(msg, from, socket)

查看源代码 (可选)
@callback handle_call(
  msg :: term(),
  from :: {pid(), tag :: term()},
  socket :: Phoenix.Socket.t()
) ::
  {:reply, response :: term(), Phoenix.Socket.t()}
  | {:noreply, Phoenix.Socket.t()}
  | {:stop, reason :: term(), Phoenix.Socket.t()}

处理常规 GenServer 调用消息。

参见 GenServer.handle_call/3.

链接到此回调

handle_cast(msg, socket)

查看源代码 (可选)
@callback handle_cast(msg :: term(), socket :: Phoenix.Socket.t()) ::
  {:noreply, Phoenix.Socket.t()} | {:stop, reason :: term(), Phoenix.Socket.t()}

处理常规 GenServer cast 消息。

参见 GenServer.handle_cast/2.

链接到此回调

handle_in(event, payload, socket)

查看源代码 (可选)
@callback handle_in(
  event :: String.t(),
  payload :: payload(),
  socket :: Phoenix.Socket.t()
) ::
  {:noreply, Phoenix.Socket.t()}
  | {:noreply, Phoenix.Socket.t(), timeout() | :hibernate}
  | {:reply, reply(), Phoenix.Socket.t()}
  | {:stop, reason :: term(), Phoenix.Socket.t()}
  | {:stop, reason :: term(), reply(), Phoenix.Socket.t()}

处理传入的 event

使用配置的序列化器在发送之前序列化有效负载。

示例

def handle_in("ping", payload, socket) do
  {:reply, {:ok, payload}, socket}
end
链接到此回调

handle_info(msg, socket)

查看源代码 (可选)
@callback handle_info(msg :: term(), socket :: Phoenix.Socket.t()) ::
  {:noreply, Phoenix.Socket.t()} | {:stop, reason :: term(), Phoenix.Socket.t()}

处理常规 Elixir 进程消息。

参见 GenServer.handle_info/2.

链接到此回调

handle_out(event, payload, socket)

查看源代码 (可选)
@callback handle_out(
  event :: String.t(),
  payload :: payload(),
  socket :: Phoenix.Socket.t()
) ::
  {:noreply, Phoenix.Socket.t()}
  | {:noreply, Phoenix.Socket.t(), timeout() | :hibernate}
  | {:stop, reason :: term(), Phoenix.Socket.t()}

拦截传出的 event

参见 intercept/1.

链接到此回调

join(topic, payload, socket)

查看源代码
@callback join(topic :: binary(), payload :: payload(), socket :: Phoenix.Socket.t()) ::
  {:ok, Phoenix.Socket.t()}
  | {:ok, reply :: payload(), Phoenix.Socket.t()}
  | {:error, reason :: map()}

处理通过 topic 加入的频道。

要授权套接字,请返回 {:ok, socket}{:ok, reply, socket}。要拒绝授权,请返回 {:error, reason}

使用配置的序列化器在发送之前序列化有效负载。

示例

def join("room:lobby", payload, socket) do
  if authorized?(payload) do
    {:ok, socket}
  else
    {:error, %{reason: "unauthorized"}}
  end
end
链接到此回调

terminate(reason, t)

查看源代码 (可选)
@callback terminate(
  reason :: :normal | :shutdown | {:shutdown, :left | :closed | term()},
  Phoenix.Socket.t()
) :: term()

在频道进程即将退出时调用。

参见 GenServer.terminate/2.

函数

链接到此函数

broadcast(socket, event, message)

查看源代码

将事件广播给套接字主题的所有订阅者。

事件的消息必须是可序列化的映射或标记的 {:binary, data} 元组,其中 data 是二进制数据。

示例

iex> broadcast(socket, "new_message", %{id: 1, content: "hello"})
:ok

iex> broadcast(socket, "new_message", {:binary, "hello"})
:ok
链接到此函数

broadcast!(socket, event, message)

查看源代码

broadcast/3 相同,但如果广播失败,则会引发异常。

链接到此函数

broadcast_from(socket, event, message)

查看源代码

从 pid 广播事件到套接字主题的所有订阅者。

拥有套接字的频道不会收到已发布的消息。事件的消息必须是可序列化的映射或标记的 {:binary, data} 元组,其中 data 是二进制数据。

示例

iex> broadcast_from(socket, "new_message", %{id: 1, content: "hello"})
:ok

iex> broadcast_from(socket, "new_message", {:binary, "hello"})
:ok
链接到此函数

broadcast_from!(socket, event, message)

查看源代码

broadcast_from/3 相同,但如果广播失败,则会引发异常。

链接到此宏

intercept(events)

查看源代码 (宏)

定义要为 handle_out/3 回调拦截的频道事件。

默认情况下,广播的事件会直接推送到客户端,但拦截事件可以让您的频道有机会为客户端自定义事件,以添加额外信息或过滤要传递的消息。

注意:如果大量订阅者必须自定义消息,则拦截事件会导致显著的开销,因为广播将被编码 N 次,而不是在所有订阅者之间进行一次共享编码。

示例

intercept ["new_msg"]

def handle_out("new_msg", payload, socket) do
  push(socket, "new_msg", Map.merge(payload,
    is_editable: User.can_edit_message?(socket.assigns[:user], payload)
  ))
  {:noreply, socket}
end

handle_out/3 回调必须返回以下内容之一

{:noreply, Socket.t} |
{:noreply, Socket.t, timeout | :hibernate} |
{:stop, reason :: term, Socket.t}
链接到此函数

push(socket, event, message)

查看源代码

将事件直接发送到已连接的客户端,无需客户端先前的消息。

事件的消息必须是可序列化的映射或标记的 {:binary, data} 元组,其中 data 是二进制数据。

请注意,与某些客户端库不同,此服务器端 push/3 不会返回引用。如果您需要从客户端获取回复并将该回复与您推送的消息相关联,则需要在消息中包含一个唯一的标识符,在频道的状态中跟踪它,让客户端在回复中包含它,并在回复到达 handle_in/3 时检查 ref。

示例

iex> push(socket, "new_message", %{id: 1, content: "hello"})
:ok

iex> push(socket, "new_message", {:binary, "hello"})
:ok
链接到此函数

reply(socket_ref, status)

查看源代码
@spec reply(socket_ref(), reply()) :: :ok

异步回复套接字推送。

回复客户端消息的通常方法是从 handle_in/3 返回一个元组,例如

{:reply, {status, payload}, socket}

但有时您需要异步回复推送 - 也就是说,在您的 handle_in/3 回调完成之后。例如,您可能需要在另一个进程中执行工作,并在工作完成后回复。

您可以使用 socket_ref/1 为套接字生成引用,并在准备回复时使用该引用调用 reply/2

注意:需要 socket_ref,这样 socket 本身就不会泄露到频道之外。该 socket 包含信息,例如分配和传输配置,因此不要将这些信息复制到拥有它的频道之外非常重要。

从技术上讲,reply/2 允许您多次回复同一个客户端消息,并且每个回复都将包含客户端消息 ref。但客户端可能只希望收到一个回复;在这种情况下,对于其他消息,push/3 会更好。

使用配置的序列化器在发送之前序列化有效负载。

示例

def handle_in("work", payload, socket) do
  Worker.perform(payload, socket_ref(socket))
  {:noreply, socket}
end

def handle_info({:work_complete, result, ref}, socket) do
  reply(ref, {:ok, result})
  {:noreply, socket}
end
@spec socket_ref(Phoenix.Socket.t()) :: socket_ref()

为异步回复生成 socket_ref

有关示例用法,请参见 reply/2