查看源代码 Phoenix.Channel 行为 (Phoenix v1.7.14)
定义一个 Phoenix Channel。
Channel 提供了一种机制,可以让客户端进行双向通信,并与 Phoenix.PubSub
层集成,实现近实时功能。
有关概念概述,请参阅 Channels 指南。
主题 & 回调
每次加入 Channel 时,都需要选择要监听的特定主题。主题只是一个标识符,但按照惯例,它通常由两个部分组成:"topic:subtopic"
。使用 "topic:subtopic"
方法与 Phoenix.Socket.channel/3
配合良好,允许通过使用通配符(*
字符)作为主题模式中的最后一个字符来匹配以给定前缀开头的所有主题
channel "room:*", MyAppWeb.RoomChannel
在上面的示例中,任何进入路由器并带有 "room:"
前缀的主题都将分派到 MyAppWeb.RoomChannel
。主题也可以在 Channel 的 join/3
回调中进行模式匹配,以提取范围内的模式
# handles the special `"lobby"` subtopic
def join("room:lobby", _payload, socket) do
{:ok, socket}
end
# handles any other subtopic as the room ID, for example `"room:12"`, `"room:34"`
def join("room:" <> room_id, _payload, socket) do
{:ok, socket}
end
授权
客户端必须加入 Channel 才能在该 Channel 上发送和接收 PubSub 事件。您的 Channel 必须实现一个 join/3
回调,以授权给定主题的 Socket。例如,您可以检查用户是否有权加入该特定房间。
要在 join/3
中授权 Socket,请返回 {:ok, socket}
。要在 join/3
中拒绝授权,请返回 {:error, reply}
。
传入事件
客户端成功加入 Channel 后,来自客户端的传入事件将通过 Channel 的 handle_in/3
回调进行路由。在这些回调中,您可以执行任何操作。传入回调必须返回 socket
以维护短暂状态。
通常,您会使用 broadcast!/3
将消息转发给所有监听者,或直接回复客户端事件以进行请求/响应式消息传递。
通用消息有效负载作为映射接收
def handle_in("new_msg", %{"uid" => uid, "body" => body}, socket) do
...
{:reply, :ok, socket}
end
二进制数据有效负载作为 {:binary, data}
元组传递
def handle_in("file_chunk", {:binary, chunk}, socket) do
...
{:reply, :ok, socket}
end
广播
以下是如何从一个客户端接收传入的 "new_msg"
事件,并将消息广播给该 Socket 的所有主题订阅者的示例。
def handle_in("new_msg", %{"uid" => uid, "body" => body}, socket) do
broadcast!(socket, "new_msg", %{uid: uid, body: body})
{:noreply, socket}
end
回复
回复对于确认客户端的消息或以操作结果进行响应很有用。回复仅发送到连接到当前 Channel 进程的客户端。在幕后,它们包含客户端消息 ref
,这使客户端能够将收到的回复与其发送的消息相关联。
例如,想象一下创建资源并用创建的记录进行回复
def handle_in("create:post", attrs, socket) do
changeset = Post.changeset(%Post{}, attrs)
if changeset.valid? do
post = Repo.insert!(changeset)
response = MyAppWeb.PostView.render("show.json", %{post: post})
{:reply, {:ok, response}, socket}
else
response = MyAppWeb.ChangesetView.render("errors.json", %{changeset: changeset})
{:reply, {:error, response}, socket}
end
end
或者您可能只想确认操作已成功
def handle_in("create:post", attrs, socket) do
changeset = Post.changeset(%Post{}, attrs)
if changeset.valid? do
Repo.insert!(changeset)
{:reply, :ok, socket}
else
{:reply, :error, socket}
end
end
二进制数据也支持通过 {:binary, data}
元组进行回复
{:reply, {:ok, {:binary, bin}}, socket}
如果您不想向客户端发送回复,您可以返回
{:noreply, socket}
您可能这样做的一种情况是,如果您需要稍后进行回复;请参阅 reply/2
。
推送
调用 push/3
允许您向客户端发送一条消息,该消息不是对特定客户端消息的回复。因为它不是回复,所以推送的消息不包含客户端消息 ref
;没有先前的客户端消息与之相关。
可能的用例包括通知客户端
- 您已自动保存了用户的文档
- 用户的游戏即将结束
- IoT 设备的设置应更新
例如,您可以在收到与他们相关的 PubSub
消息后,在 handle_info/3
中向客户端 push/3
消息。
alias Phoenix.Socket.Broadcast
def handle_info(%Broadcast{topic: _, event: event, payload: payload}, socket) do
push(socket, event, payload)
{:noreply, socket}
end
推送数据可以以映射或带标签的 {:binary, data}
元组的形式给出
# client asks for their current rank. reply contains it, and client
# is also pushed a leader board and a badge image
def handle_in("current_rank", _, socket) do
push(socket, "leaders", %{leaders: Game.get_leaders(socket.assigns.game_id)})
push(socket, "badge", {:binary, File.read!(socket.assigns.badge_path)})
{:reply, %{val: Game.get_rank(socket.assigns[:user])}, socket}
end
请注意,在此示例中,push/3
是从 handle_in/3
中调用的;这样,您基本上可以对来自客户端的单个消息进行 N 次回复。请参阅 reply/2
,了解为什么这样做可能很方便。
拦截传出事件
当使用 broadcast/3
广播事件时,每个 Channel 订阅者可以选择拦截该事件并触发其 handle_out/3
回调。这允许事件的有效负载在每个 Socket 基础上进行自定义,以附加额外信息,或有条件地过滤掉要传递的消息。如果事件未通过 Phoenix.Channel.intercept/1
拦截,则消息将直接推送到客户端
intercept ["new_msg", "user_joined"]
# for every socket subscribing to this topic, append an `is_editable`
# value for client metadata.
def handle_out("new_msg", msg, socket) do
push(socket, "new_msg", Map.merge(msg,
%{is_editable: User.can_edit_message?(socket.assigns[:user], msg)}
))
{:noreply, socket}
end
# do not send broadcasted `"user_joined"` events if this socket's user
# is ignoring the user who joined.
def handle_out("user_joined", msg, socket) do
unless User.ignoring?(socket.assigns[:user], msg.user_id) do
push(socket, "user_joined", msg)
end
{:noreply, socket}
end
广播到外部主题
在某些情况下,您可能希望在没有 socket
上下文的情况下广播消息。这可能是为了从您的 Channel 内部广播到外部主题,或者从应用程序中的其他地方(如控制器或另一个进程)广播。可以通过您的端点执行此操作
# within channel
def handle_in("new_msg", %{"uid" => uid, "body" => body}, socket) do
...
broadcast_from!(socket, "new_msg", %{uid: uid, body: body})
MyAppWeb.Endpoint.broadcast_from!(self(), "room:superadmin",
"new_msg", %{uid: uid, body: body})
{:noreply, socket}
end
# within controller
def create(conn, params) do
...
MyAppWeb.Endpoint.broadcast!("room:" <> rid, "new_msg", %{uid: uid, body: body})
MyAppWeb.Endpoint.broadcast!("room:superadmin", "new_msg", %{uid: uid, body: body})
redirect(conn, to: "/")
end
终止
终止时,将调用 Channel 回调 terminate/2
,并传入错误原因和 Socket。
如果我们正在终止,因为客户端离开了,那么原因将是 {:shutdown, :left}
。同样,如果我们正在终止,因为客户端连接已关闭,那么原因将是 {:shutdown, :closed}
。
如果任何回调返回一个 :stop
元组,它也会触发终止,并传入元组中给出的原因。
terminate/2
不会在错误的情况下或退出情况下调用。这与您在 Elixir 抽象(如 GenServer
等)中发现的行为相同。与 GenServer
类似,也可以 :trap_exit
来保证调用 terminate/2
。但并不鼓励这种做法。
一般来说,如果您想清理一些东西,最好监视您的 Channel 进程并从另一个进程进行清理。所有 Channel 回调(包括 join/3
)都在 Channel 进程内部调用。因此,它们中的任何一个中的 self()
都将返回要监视的 PID。
停止 Channel 时退出原因
当 Channel 回调返回一个 :stop
元组时,例如
{:stop, :shutdown, socket}
{:stop, {:error, :enoent}, socket}
第二个参数是退出原因,它遵循与标准 GenServer
退出相同的行为。
在关闭 Channel 时,您可以从以下三种选项中选择
:normal
- 在这种情况下,退出不会被记录,并且链接的进程不会退出:shutdown
或{:shutdown, term}
- 在这种情况下,退出不会被记录,并且链接的进程会以相同的原因退出,除非它们正在捕获退出任何其他项 - 在这种情况下,退出将被记录,并且链接的进程会以相同的原因退出,除非它们正在捕获退出
订阅外部主题
有时,您可能需要以编程方式将 Socket 订阅到除了内部 socket.topic
之外的外部主题。例如,想象一下您有一个竞拍系统,其中远程客户端动态地设置他们想要接收竞拍通知的产品的偏好。与为每个偏好需要一个唯一的 Channel 进程和主题相比,一个更有效且更简单的方法是通过您的端点将单个 Channel 订阅到相关的通知。例如
defmodule MyAppWeb.Endpoint.NotificationChannel do
use Phoenix.Channel
def join("notification:" <> user_id, %{"ids" => ids}, socket) do
topics = for product_id <- ids, do: "product:#{product_id}"
{:ok, socket
|> assign(:topics, [])
|> put_new_topics(topics)}
end
def handle_in("watch", %{"product_id" => id}, socket) do
{:reply, :ok, put_new_topics(socket, ["product:#{id}"])}
end
def handle_in("unwatch", %{"product_id" => id}, socket) do
{:reply, :ok, MyAppWeb.Endpoint.unsubscribe("product:#{id}")}
end
defp put_new_topics(socket, topics) do
Enum.reduce(topics, socket, fn topic, acc ->
topics = acc.assigns.topics
if topic in topics do
acc
else
:ok = MyAppWeb.Endpoint.subscribe(topic)
assign(acc, :topics, [topic | topics])
end
end)
end
end
注意:调用者必须负责防止重复订阅。在从您的端点调用 subscribe/1
之后,相同流程适用于在您的 Channel 中处理常规 Elixir 消息。最常见的是,您只需转发 %Phoenix.Socket.Broadcast{}
事件和有效负载
alias Phoenix.Socket.Broadcast
def handle_info(%Broadcast{topic: _, event: event, payload: payload}, socket) do
push(socket, event, payload)
{:noreply, socket}
end
休眠
从 Erlang/OTP 20 开始,Channel 在 15_000 毫秒的空闲时间后会自动休眠以节省内存。这可以通过将 :hibernate_after
选项传递给 use Phoenix.Channel
来自定义
use Phoenix.Channel, hibernate_after: 60_000
您也可以将其设置为 :infinity
以完全禁用它。
关闭
您可以通过在使用时设置 :shutdown
值来配置应用程序关闭时使用的每个 Channel 的关闭行为
use Phoenix.Channel, shutdown: 5_000
它默认为 5_000。受支持的值在 Supervisor
模块文档中进行了描述。
记录
默认情况下,Channel "join"
和 "handle_in"
事件使用 :info
和 :debug
级别分别进行记录。您可以更改用于每个事件的级别,或通过在使用 Phoenix.Channel
时设置 :log_join
和 :log_handle_in
选项来禁用每个事件类型的日志。例如,以下配置将加入事件记录为 :info
,但会禁用传入事件的记录
use Phoenix.Channel, log_join: :info, log_handle_in: false
请注意,更改事件类型的级别不会影响记录的内容,除非您将其设置为 false
,否则它会影响关联的级别。
总结
回调
处理常规 GenServer 调用消息。
处理常规 GenServer cast 消息。
处理传入的 event
。
处理常规 Elixir 进程消息。
拦截传出的 event
。
处理通过 topic
加入的频道。
在频道进程即将退出时调用。
函数
将事件广播给套接字主题的所有订阅者。
与 broadcast/3
相同,但如果广播失败,则会引发异常。
从 pid 广播事件到套接字主题的所有订阅者。
与 broadcast_from/3
相同,但如果广播失败,则会引发异常。
定义要为 handle_out/3
回调拦截的频道事件。
将事件直接发送到已连接的客户端,无需客户端先前的消息。
异步回复套接字推送。
为异步回复生成 socket_ref
。
类型
回调
@callback code_change(old_vsn, Phoenix.Socket.t(), extra :: term()) :: {:ok, Phoenix.Socket.t()} | {:error, reason :: term()} when old_vsn: term() | {:down, term()}
@callback handle_call( msg :: term(), from :: {pid(), tag :: term()}, socket :: Phoenix.Socket.t() ) :: {:reply, response :: term(), Phoenix.Socket.t()} | {:noreply, Phoenix.Socket.t()} | {:stop, reason :: term(), Phoenix.Socket.t()}
处理常规 GenServer 调用消息。
@callback handle_cast(msg :: term(), socket :: Phoenix.Socket.t()) :: {:noreply, Phoenix.Socket.t()} | {:stop, reason :: term(), Phoenix.Socket.t()}
处理常规 GenServer cast 消息。
@callback handle_in( event :: String.t(), payload :: payload(), socket :: Phoenix.Socket.t() ) :: {:noreply, Phoenix.Socket.t()} | {:noreply, Phoenix.Socket.t(), timeout() | :hibernate} | {:reply, reply(), Phoenix.Socket.t()} | {:stop, reason :: term(), Phoenix.Socket.t()} | {:stop, reason :: term(), reply(), Phoenix.Socket.t()}
处理传入的 event
。
使用配置的序列化器在发送之前序列化有效负载。
示例
def handle_in("ping", payload, socket) do
{:reply, {:ok, payload}, socket}
end
@callback handle_info(msg :: term(), socket :: Phoenix.Socket.t()) :: {:noreply, Phoenix.Socket.t()} | {:stop, reason :: term(), Phoenix.Socket.t()}
处理常规 Elixir 进程消息。
@callback handle_out( event :: String.t(), payload :: payload(), socket :: Phoenix.Socket.t() ) :: {:noreply, Phoenix.Socket.t()} | {:noreply, Phoenix.Socket.t(), timeout() | :hibernate} | {:stop, reason :: term(), Phoenix.Socket.t()}
拦截传出的 event
。
参见 intercept/1
.
@callback join(topic :: binary(), payload :: payload(), socket :: Phoenix.Socket.t()) :: {:ok, Phoenix.Socket.t()} | {:ok, reply :: payload(), Phoenix.Socket.t()} | {:error, reason :: map()}
处理通过 topic
加入的频道。
要授权套接字,请返回 {:ok, socket}
或 {:ok, reply, socket}
。要拒绝授权,请返回 {:error, reason}
。
使用配置的序列化器在发送之前序列化有效负载。
示例
def join("room:lobby", payload, socket) do
if authorized?(payload) do
{:ok, socket}
else
{:error, %{reason: "unauthorized"}}
end
end
@callback terminate( reason :: :normal | :shutdown | {:shutdown, :left | :closed | term()}, Phoenix.Socket.t() ) :: term()
在频道进程即将退出时调用。
函数
将事件广播给套接字主题的所有订阅者。
事件的消息必须是可序列化的映射或标记的 {:binary, data}
元组,其中 data
是二进制数据。
示例
iex> broadcast(socket, "new_message", %{id: 1, content: "hello"})
:ok
iex> broadcast(socket, "new_message", {:binary, "hello"})
:ok
与 broadcast/3
相同,但如果广播失败,则会引发异常。
从 pid 广播事件到套接字主题的所有订阅者。
拥有套接字的频道不会收到已发布的消息。事件的消息必须是可序列化的映射或标记的 {:binary, data}
元组,其中 data
是二进制数据。
示例
iex> broadcast_from(socket, "new_message", %{id: 1, content: "hello"})
:ok
iex> broadcast_from(socket, "new_message", {:binary, "hello"})
:ok
与 broadcast_from/3
相同,但如果广播失败,则会引发异常。
定义要为 handle_out/3
回调拦截的频道事件。
默认情况下,广播的事件会直接推送到客户端,但拦截事件可以让您的频道有机会为客户端自定义事件,以添加额外信息或过滤要传递的消息。
注意:如果大量订阅者必须自定义消息,则拦截事件会导致显著的开销,因为广播将被编码 N 次,而不是在所有订阅者之间进行一次共享编码。
示例
intercept ["new_msg"]
def handle_out("new_msg", payload, socket) do
push(socket, "new_msg", Map.merge(payload,
is_editable: User.can_edit_message?(socket.assigns[:user], payload)
))
{:noreply, socket}
end
handle_out/3
回调必须返回以下内容之一
{:noreply, Socket.t} |
{:noreply, Socket.t, timeout | :hibernate} |
{:stop, reason :: term, Socket.t}
将事件直接发送到已连接的客户端,无需客户端先前的消息。
事件的消息必须是可序列化的映射或标记的 {:binary, data}
元组,其中 data
是二进制数据。
请注意,与某些客户端库不同,此服务器端 push/3
不会返回引用。如果您需要从客户端获取回复并将该回复与您推送的消息相关联,则需要在消息中包含一个唯一的标识符,在频道的状态中跟踪它,让客户端在回复中包含它,并在回复到达 handle_in/3
时检查 ref。
示例
iex> push(socket, "new_message", %{id: 1, content: "hello"})
:ok
iex> push(socket, "new_message", {:binary, "hello"})
:ok
@spec reply(socket_ref(), reply()) :: :ok
异步回复套接字推送。
回复客户端消息的通常方法是从 handle_in/3
返回一个元组,例如
{:reply, {status, payload}, socket}
但有时您需要异步回复推送 - 也就是说,在您的 handle_in/3
回调完成之后。例如,您可能需要在另一个进程中执行工作,并在工作完成后回复。
您可以使用 socket_ref/1
为套接字生成引用,并在准备回复时使用该引用调用 reply/2
。
注意:需要 socket_ref
,这样 socket
本身就不会泄露到频道之外。该 socket
包含信息,例如分配和传输配置,因此不要将这些信息复制到拥有它的频道之外非常重要。
从技术上讲,reply/2
允许您多次回复同一个客户端消息,并且每个回复都将包含客户端消息 ref
。但客户端可能只希望收到一个回复;在这种情况下,对于其他消息,push/3
会更好。
使用配置的序列化器在发送之前序列化有效负载。
示例
def handle_in("work", payload, socket) do
Worker.perform(payload, socket_ref(socket))
{:noreply, socket}
end
def handle_info({:work_complete, result, ref}, socket) do
reply(ref, {:ok, result})
{:noreply, socket}
end
@spec socket_ref(Phoenix.Socket.t()) :: socket_ref()
为异步回复生成 socket_ref
。
有关示例用法,请参见 reply/2
。