查看源代码 频道

要求:本指南假定您已阅读 入门指南 并成功启动了一个 Phoenix 应用程序 运行起来

频道是 Phoenix 中激动人心的功能,它支持与数百万个连接的客户端进行软实时通信,以及客户端之间的通信。

一些可能的用例包括

  • 聊天室和用于消息应用程序的 API
  • 突发新闻,例如“进球了”或“地震将要来临”
  • 在地图上跟踪火车、卡车或比赛参与者
  • 多人游戏中发生的事件
  • 监控传感器和控制灯光
  • 通知浏览器页面 CSS 或 JavaScript 已更改(这在开发中非常有用)

从概念上讲,频道非常简单。

首先,客户端使用一些传输(如 WebSocket)连接到服务器。连接后,它们会加入一个或多个主题。例如,要与公共聊天室交互,客户端可能需要加入名为 public_chat 的主题,而要接收来自 ID 为 7 的产品的更新,它们可能需要加入名为 product_updates:7 的主题。

客户端可以向它们加入的主题推送消息,也可以接收来自这些主题的消息。反过来,频道服务器从其连接的客户端接收消息,也可以向它们推送消息。

服务器能够向订阅特定主题的所有客户端广播消息。下图说明了这一点

                                                                  +----------------+
                                                     +--Topic X-->| Mobile Client  |
                                                     |            +----------------+
                              +-------------------+  |
+----------------+            |                   |  |            +----------------+
| Browser Client |--Topic X-->| Phoenix Server(s) |--+--Topic X-->| Desktop Client |
+----------------+            |                   |  |            +----------------+
                              +-------------------+  |
                                                     |            +----------------+
                                                     +--Topic X-->|   IoT Client   |
                                                                  +----------------+

即使应用程序运行在多个节点/计算机上,广播也能正常工作。也就是说,如果两个客户端的套接字连接到不同的应用程序节点,并且都订阅了同一个主题 T,它们都会收到广播到 T 的消息。这是由于内部的 PubSub 机制实现的。

频道可以支持任何类型的客户端:浏览器、原生应用程序、智能手表、嵌入式设备或任何可以连接到网络的设备。客户端只需要一个合适的库;请参阅下面的 客户端库 部分。每个客户端库都使用频道理解的“传输”之一进行通信。目前,是 Websockets 或长轮询,但将来可能会添加其他传输方式。

与无状态的 HTTP 连接不同,频道支持长连接,每个连接都由一个轻量级 BEAM 进程支持,这些进程并行运行并维护自身状态。

这种架构可扩展性好;Phoenix 频道 可以在一台机器上支持数百万个订阅者,并在合理的延迟情况下每秒传递数十万条消息,并且可以通过向集群添加更多节点来提高容量。

活动部件

尽管从客户端的角度来看频道使用起来很简单,但仍有许多组件参与跨服务器集群将消息路由到客户端。让我们看一下它们。

概述

要开始通信,客户端会使用传输(例如 Websockets 或长轮询)连接到节点(Phoenix 服务器),并使用该单个网络连接加入一个或多个频道。每个客户端、每个主题都会创建一个频道服务器轻量级进程。每个频道都保存着 %Phoenix.Socket{},并且可以在其 socket.assigns 中维护所需的任何状态。

连接建立后,来自客户端的每个传入消息都会根据其主题路由到正确的频道服务器。如果频道服务器要求广播消息,该消息将被发送到本地 PubSub,后者会将其发送到连接到同一服务器并订阅该主题的所有客户端。

如果集群中存在其他节点,本地 PubSub 还会将其转发到它们的 PubSub,后者会将其发送到它们自己的订阅者。由于每个附加节点只需要发送一条消息,因此添加节点的性能成本可以忽略不计,而每个新节点都支持更多的订阅者。

消息流看起来像这样

                                 Channel   +-------------------------+      +--------+
                                  route    | Sending Client, Topic 1 |      | Local  |
                              +----------->|     Channel.Server      |----->| PubSub |--+
+----------------+            |            +-------------------------+      +--------+  |
| Sending Client |-Transport--+                                                  |      |
+----------------+                         +-------------------------+           |      |
                                           | Sending Client, Topic 2 |           |      |
                                           |     Channel.Server      |           |      |
                                           +-------------------------+           |      |
                                                                                 |      |
                                           +-------------------------+           |      |
+----------------+                         | Browser Client, Topic 1 |           |      |
| Browser Client |<-------Transport--------|     Channel.Server      |<----------+      |
+----------------+                         +-------------------------+                  |
                                                                                        |
                                                                                        |
                                                                                        |
                                           +-------------------------+                  |
+----------------+                         |  Phone Client, Topic 1  |                  |
|  Phone Client  |<-------Transport--------|     Channel.Server      |<-+               |
+----------------+                         +-------------------------+  |   +--------+  |
                                                                        |   | Remote |  |
                                           +-------------------------+  +---| PubSub |<-+
+----------------+                         |  Watch Client, Topic 1  |  |   +--------+  |
|  Watch Client  |<-------Transport--------|     Channel.Server      |<-+               |
+----------------+                         +-------------------------+                  |
                                                                                        |
                                                                                        |
                                           +-------------------------+      +--------+  |
+----------------+                         |   IoT Client, Topic 1   |      | Remote |  |
|   IoT Client   |<-------Transport--------|     Channel.Server      |<-----| PubSub |<-+
+----------------+                         +-------------------------+      +--------+

端点

在您的 Phoenix 应用程序的 Endpoint 模块中,socket 声明指定了哪个套接字处理程序将在给定的 URL 上接收连接。

socket "/socket", HelloWeb.UserSocket,
  websocket: true,
  longpoll: false

Phoenix 带有两个默认的传输方式:websocket 和 longpoll。您可以通过 socket 声明直接配置它们。

套接字处理程序

在客户端,您将建立一个套接字连接到上面的路由

let socket = new Socket("/socket", {params: {token: window.userToken}})

在服务器端,Phoenix 将调用 HelloWeb.UserSocket.connect/2,传递您的参数和初始套接字状态。在套接字中,您可以验证和标识套接字连接,并设置默认的套接字分配。您也可以在套接字中定义频道路由。

频道路由

频道路由匹配主题字符串,并将匹配的请求调度到给定的 Channel 模块。

星号字符 * 充当通配符匹配器,因此在以下示例路由中,对 room:lobbyroom:123 的请求都将被调度到 RoomChannel。在您的 UserSocket 中,您将拥有

channel "room:*", HelloWeb.RoomChannel

频道

频道处理来自客户端的事件,因此它们类似于控制器,但有两个主要区别。频道事件可以双向进行 - 传入和传出。频道连接也持久存在于单个请求/响应周期之外。频道是 Phoenix 中用于实时通信组件的最高级抽象。

每个频道将实现这四个回调函数中的一个或多个子句 - join/3terminate/2handle_in/3handle_out/3

主题

主题是字符串标识符 - 各个层使用这些名称来确保消息最终到达正确的位置。正如我们在上面看到的,主题可以使用通配符。这允许使用有用的 "topic:subtopic" 约定。通常,您将使用来自应用程序层的记录 ID 来组合主题,例如 "users:123"

消息

Phoenix.Socket.Message 模块定义了一个带有以下键的结构体,这些键表示有效消息。来自 Phoenix.Socket.Message 文档

  • topic - 字符串主题或 "topic:subtopic" 对命名空间,例如 "messages""messages:123"
  • event - 字符串事件名称,例如 "phx_join"
  • payload - 消息有效负载
  • ref - 唯一的字符串引用

PubSub

PubSub 由 Phoenix.PubSub 模块提供。感兴趣的方可以通过订阅主题来接收事件。其他进程可以将事件广播到特定主题。

这对在频道上广播消息以及一般应用程序开发都很有用。例如,让所有连接的 实时视图 知道已在帖子中添加了新评论。

PubSub 系统负责将消息从一个节点传递到另一个节点,以便可以将这些消息发送到整个集群中的所有订阅者。默认情况下,这是使用 Phoenix.PubSub.PG2 完成的,后者使用本机 BEAM 消息传递。

如果您的部署环境不支持分布式 Elixir 或服务器之间的直接通信,Phoenix 还附带了 Redis 适配器,它使用 Redis 交换 PubSub 数据。有关更多信息,请参阅 Phoenix.PubSub 文档

客户端库

只要有客户端库,任何网络设备都可以连接到 Phoenix 频道。目前存在以下库,并且始终欢迎新的库;要编写您自己的库,请参阅我们的操作指南 编写频道客户端

官方

Phoenix 带有一个 JavaScript 客户端,它在生成新的 Phoenix 项目时可用。JavaScript 模块的文档位于 https://hexdocs.erlang.ac.cn/phoenix/js/;代码位于 多个 js 文件中

第三方

将所有内容整合在一起

让我们通过构建一个简单的聊天应用程序来将所有这些想法整合在一起。请确保您 创建了一个新的 Phoenix 应用程序,现在我们准备生成 UserSocket

生成套接字

让我们调用套接字生成器来开始

$ mix phx.gen.socket User

它将创建两个文件:客户端代码位于 assets/js/user_socket.js 中,服务器对应代码位于 lib/hello_web/channels/user_socket.ex 中。运行后,生成器还会要求将以下行添加到 lib/hello_web/endpoint.ex

defmodule HelloWeb.Endpoint do
  use Phoenix.Endpoint, otp_app: :hello

  socket "/socket", HelloWeb.UserSocket,
    websocket: true,
    longpoll: false

  ...
end

生成器还会要求我们导入客户端代码,我们稍后会这样做。

接下来,我们将配置套接字以确保消息被路由到正确的频道。为此,我们将取消注释 "room:*" 频道定义

defmodule HelloWeb.UserSocket do
  use Phoenix.Socket

  ## Channels
  channel "room:*", HelloWeb.RoomChannel
  ...

现在,每当客户端发送主题以 "room:" 开头的消息时,该消息将被路由到我们的 RoomChannel。接下来,我们将定义一个 HelloWeb.RoomChannel 模块来管理我们的聊天室消息。

加入频道

您频道的首要任务是授权客户端加入给定的主题。为了授权,我们必须在 lib/hello_web/channels/room_channel.ex 中实现 join/3

defmodule HelloWeb.RoomChannel do
  use Phoenix.Channel

  def join("room:lobby", _message, socket) do
    {:ok, socket}
  end

  def join("room:" <> _private_room_id, _params, _socket) do
    {:error, %{reason: "unauthorized"}}
  end
end

对于我们的聊天应用程序,我们将允许任何人加入 "room:lobby" 主题,但任何其他房间都将被视为私有,并且需要特殊的授权,例如来自数据库的授权。(我们不会在本练习中考虑私人聊天室,但请随时在我们完成后探索。)

有了我们的频道,让我们让客户端和服务器开始通信。

生成的 assets/js/user_socket.js 定义了一个基于 Phoenix 附带的套接字实现的简单客户端。

我们可以使用该库连接到套接字并加入频道,我们只需要在该文件中将房间名称设置为 "room:lobby"

// assets/js/user_socket.js
// ...
socket.connect()

// Now that you are connected, you can join channels with a topic:
let channel = socket.channel("room:lobby", {})
channel.join()
  .receive("ok", resp => { console.log("Joined successfully", resp) })
  .receive("error", resp => { console.log("Unable to join", resp) })

export default socket

之后,我们需要确保 assets/js/user_socket.js 被导入到我们的应用程序 JavaScript 文件中。为此,请在 assets/js/app.js 中取消注释此行。

// ...
import "./user_socket.js"

保存文件,您的浏览器应该会自动刷新,这得益于 Phoenix 的实时重新加载器。如果一切正常,我们应该在浏览器的 JavaScript 控制台中看到“Joined successfully”。我们的客户端和服务器现在通过持久连接进行通信。现在让我们通过启用聊天功能使其变得有用。

lib/hello_web/controllers/page_html/home.html.heex 中,我们将用一个容器替换现有代码来容纳我们的聊天消息,以及一个输入字段来发送它们

<div id="messages" role="log" aria-live="polite"></div>
<input id="chat-input" type="text">

现在让我们为 assets/js/user_socket.js 添加几个事件监听器

// ...
let channel           = socket.channel("room:lobby", {})
let chatInput         = document.querySelector("#chat-input")
let messagesContainer = document.querySelector("#messages")

chatInput.addEventListener("keypress", event => {
  if(event.key === 'Enter'){
    channel.push("new_msg", {body: chatInput.value})
    chatInput.value = ""
  }
})

channel.join()
  .receive("ok", resp => { console.log("Joined successfully", resp) })
  .receive("error", resp => { console.log("Unable to join", resp) })

export default socket

我们所要做的就是检测是否按下了回车键,然后通过通道以消息主体 push 一个事件。我们将事件命名为 "new_msg"。有了这个,让我们处理聊天应用程序的另一部分,在这里我们监听新消息并将其附加到我们的消息容器中。

// ...
let channel           = socket.channel("room:lobby", {})
let chatInput         = document.querySelector("#chat-input")
let messagesContainer = document.querySelector("#messages")

chatInput.addEventListener("keypress", event => {
  if(event.key === 'Enter'){
    channel.push("new_msg", {body: chatInput.value})
    chatInput.value = ""
  }
})

channel.on("new_msg", payload => {
  let messageItem = document.createElement("p")
  messageItem.innerText = `[${Date()}] ${payload.body}`
  messagesContainer.appendChild(messageItem)
})

channel.join()
  .receive("ok", resp => { console.log("Joined successfully", resp) })
  .receive("error", resp => { console.log("Unable to join", resp) })

export default socket

我们使用 channel.on 监听 "new_msg" 事件,然后将消息主体附加到 DOM。现在让我们处理服务器上的传入和传出事件来完成这幅画面。

传入事件

我们使用 handle_in/3 处理传入事件。我们可以对事件名称进行模式匹配,比如 "new_msg",然后获取客户端通过通道传递的有效负载。对于我们的聊天应用程序,我们只需要使用 broadcast!/3 通知所有其他 room:lobby 订阅者新消息即可。

defmodule HelloWeb.RoomChannel do
  use Phoenix.Channel

  def join("room:lobby", _message, socket) do
    {:ok, socket}
  end

  def join("room:" <> _private_room_id, _params, _socket) do
    {:error, %{reason: "unauthorized"}}
  end

  def handle_in("new_msg", %{"body" => body}, socket) do
    broadcast!(socket, "new_msg", %{body: body})
    {:noreply, socket}
  end
end

broadcast!/3 将通知此 socket 主题上的所有已加入客户端,并调用它们的 handle_out/3 回调。 handle_out/3 不是必需的回调,但它允许我们在广播到达每个客户端之前对其进行自定义和过滤。默认情况下, handle_out/3 是为我们实现的,它只是将消息推送到客户端。挂钩到传出事件允许强大的消息自定义和过滤。让我们看看如何实现。

拦截传出事件

我们不会为我们的应用程序实现这一点,但假设我们的聊天应用程序允许用户忽略有关新用户加入房间的消息。我们可以像这样实现这种行为,其中我们明确地告诉 Phoenix 我们要拦截哪个传出事件,然后为这些事件定义一个 handle_out/3 回调。(当然,这假设我们有一个 Accounts 上下文,其中包含一个 ignoring_user?/2 函数,并且我们通过 assigns 地图传递了一个用户)。重要的是要注意, handle_out/3 回调将被调用以用于消息的每个接收者,因此在将更昂贵的操作(如命中数据库)包含在 handle_out/3 中之前,应仔细考虑这些操作。

intercept ["user_joined"]

def handle_out("user_joined", msg, socket) do
  if Accounts.ignoring_user?(socket.assigns[:user], msg.user_id) do
    {:noreply, socket}
  else
    push(socket, "user_joined", msg)
    {:noreply, socket}
  end
end

这就是我们的基本聊天应用程序的所有内容。启动多个浏览器标签,您应该会看到您的消息被推送到所有窗口并广播到所有窗口!

使用令牌身份验证

当我们连接时,我们通常需要对客户端进行身份验证。幸运的是,这是一个使用 Phoenix.Token 的 4 步过程。

步骤 1 - 在连接中分配令牌

假设我们在应用程序中有一个名为 OurAuth 的身份验证插件。当 OurAuth 认证用户时,它会在 conn.assigns 中为 :current_user 键设置一个值。由于 current_user 存在,我们可以简单地在连接中分配用户的令牌,以便在布局中使用。我们可以将此行为包装在一个私有函数插件中, put_user_token/2。这也可以放在它自己的模块中。为了使这一切正常工作,我们只需将 OurAuthput_user_token/2 添加到浏览器管道即可。

pipeline :browser do
  ...
  plug OurAuth
  plug :put_user_token
end

defp put_user_token(conn, _) do
  if current_user = conn.assigns[:current_user] do
    token = Phoenix.Token.sign(conn, "user socket", current_user.id)
    assign(conn, :user_token, token)
  else
    conn
  end
end

现在我们的 conn.assigns 包含 current_useruser_token

步骤 2 - 将令牌传递给 JavaScript

接下来,我们需要将此令牌传递给 JavaScript。我们可以在 lib/hello_web/components/layouts/app.html.heex 中的脚本标签内,紧接在 app.js 脚本上方执行此操作,如下所示

<script>window.userToken = "<%= assigns[:user_token] %>";</script>
<script src={~p"/assets/app.js"}></script>

步骤 3 - 将令牌传递给 Socket 构造函数并进行验证

我们还需要将 :params 传递给 Socket 构造函数,并在 connect/3 函数中验证用户令牌。为此,请编辑 lib/hello_web/channels/user_socket.ex,如下所示

def connect(%{"token" => token}, socket, _connect_info) do
  # max_age: 1209600 is equivalent to two weeks in seconds
  case Phoenix.Token.verify(socket, "user socket", token, max_age: 1209600) do
    {:ok, user_id} ->
      {:ok, assign(socket, :current_user, user_id)}
    {:error, reason} ->
      :error
  end
end

在我们的 JavaScript 中,我们可以在构造 Socket 时使用之前设置的令牌

let socket = new Socket("/socket", {params: {token: window.userToken}})

我们使用 Phoenix.Token.verify/4 来验证客户端提供的用户令牌。 Phoenix.Token.verify/4 返回 {:ok, user_id}{:error, reason}。我们可以在 case 语句中对该返回值进行模式匹配。使用经过验证的令牌,我们将用户的 ID 设置为 Socket 中 :current_user 的值。否则,我们将返回 :error

步骤 4 - 在 JavaScript 中连接到 Socket

设置身份验证后,我们可以从 JavaScript 连接到 Socket 和通道。

let socket = new Socket("/socket", {params: {token: window.userToken}})
socket.connect()

现在我们已经连接,我们可以使用主题加入通道

let channel = socket.channel("topic:subtopic", {})
channel.join()
  .receive("ok", resp => { console.log("Joined successfully", resp) })
  .receive("error", resp => { console.log("Unable to join", resp) })

export default socket

请注意,令牌身份验证是首选方法,因为它与传输无关,非常适合长时间运行的连接(如通道),而不是使用会话或其他身份验证方法。

容错和可靠性保证

服务器会重启,网络会断开,客户端会失去连接。为了设计健壮的系统,我们需要了解 Phoenix 如何响应这些事件以及它提供的哪些保证。

处理重新连接

客户端订阅主题,Phoenix 将这些订阅存储在内存中的 ETS 表中。如果通道崩溃,客户端将需要重新连接到他们之前订阅的主题。幸运的是,Phoenix JavaScript 客户端知道如何做到这一点。服务器将通知所有客户端崩溃情况。这将触发每个客户端的 Channel.onError 回调。客户端将尝试使用指数退避策略重新连接到服务器。重新连接后,它们将尝试重新加入它们之前订阅的主题。如果成功,它们将像以前一样开始接收来自这些主题的消息。

重新发送客户端消息

通道客户端将传出消息排队到 PushBuffer 中,并在有连接时将其发送到服务器。如果连接不可用,客户端将保留消息,直到它可以建立新连接。在没有连接的情况下,客户端将保留消息,直到它建立连接,或者直到它接收到 timeout 事件。默认超时设置为 5000 毫秒。客户端不会将消息持久化到浏览器的本地存储中,因此如果浏览器标签关闭,消息将消失。

重新发送服务器消息

Phoenix 在将消息发送到客户端时使用最多一次策略。如果客户端处于离线状态并错过了消息,Phoenix 不会重新发送它。Phoenix 不会将消息持久化到服务器上。如果服务器重启,未发送的消息将消失。如果我们的应用程序需要围绕消息传递更强的保证,我们需要自己编写代码。常见的方法包括将消息持久化到服务器上,并让客户端请求缺少的消息。例如,请参阅 Chris McCord 的 Phoenix 培训:客户端代码服务器代码

示例应用程序

要查看我们刚刚构建的应用程序示例,请查看项目 phoenix_chat_example