查看源代码 Phoenix.ChannelTest (Phoenix v1.7.14)

用于测试 Phoenix 通道的便利函数。

在通道测试中,我们通过进程通信与通道交互,发送和接收消息。 通常还会订阅通道订阅的相同主题,这样就可以断言是否广播了给定消息。

通道测试

要开始,请在测试用例中定义模块属性 @endpoint,指向您的应用程序端点。

然后您可以直接创建一个套接字,并使用 subscribe_and_join/4 来订阅主题和通道

{:ok, _, socket} =
  socket(UserSocket, "user:id", %{some_assigns: 1})
  |> subscribe_and_join(RoomChannel, "room:lobby", %{"id" => 3})

通常,您希望设置与 UserSocket.connect/3 回调设置的相同的 ID 和分配。 或者,您可以使用 connect/3 帮助程序来调用您的 UserSocket.connect/3 回调,并使用套接字 ID 初始化套接字

{:ok, socket} = connect(UserSocket, %{"some" => "params"}, %{})
{:ok, _, socket} = subscribe_and_join(socket, "room:lobby", %{"id" => 3})

调用后,subscribe_and_join/4 将当前测试进程订阅到 “room:lobby” 主题,并在另一个进程中启动一个通道。 它返回 {:ok, reply, socket}{:error, reply}

现在,通道有一个代表它将推送到客户端的通信的套接字,我们的测试有一个代表将推送到服务器的通信的套接字。

例如,我们可以使用测试中的 push/3 函数将消息推送到通道(它将调用 handle_in/3

push(socket, "my_event", %{"some" => "data"})

同样,我们可以从测试本身广播主题上的消息,该主题由测试和通道都订阅,从而触发通道上的 handle_out/3

broadcast_from(socket, "my_event", %{"some" => "data"})

注意,只有 broadcast_from/3broadcast_from!/3 在测试中可用,以避免将广播消息重新发送到测试进程。

虽然上面的函数将数据推送到通道(服务器),但我们可以使用 assert_push/3 来验证通道是否将消息推送到客户端

assert_push "my_event", %{"some" => "data"}

或者甚至断言某件事被广播到 pubsub 中

assert_broadcast "my_event", %{"some" => "data"}

最后,每次将消息推送到通道时,都会返回一个引用。 我们可以使用此引用来断言服务器发送了特定的回复

ref = push(socket, "counter", %{})
assert_reply ref, :ok, %{"counter" => 1}

检查副作用

通常人们可能希望在通道内部执行副作用,例如写入数据库,并在测试期间验证这些副作用。

想象一下通道内部的以下 handle_in/3

def handle_in("publish", %{"id" => id}, socket) do
  Repo.get!(Post, id) |> Post.publish() |> Repo.update!()
  {:noreply, socket}
end

由于整个通信都是异步的,因此以下测试将非常脆弱

push(socket, "publish", %{"id" => 3})
assert Repo.get_by(Post, id: 3, published: true)

问题是,我们无法保证通道在调用 push/3 后已完成处理我们的消息。 最好的解决方案是在执行任何其他断言之前断言通道发送了回复。 首先更改通道以发送回复

def handle_in("publish", %{"id" => id}, socket) do
  Repo.get!(Post, id) |> Post.publish() |> Repo.update!()
  {:reply, :ok, socket}
end

然后在测试中预期它们

ref = push(socket, "publish", %{"id" => 3})
assert_reply ref, :ok
assert Repo.get_by(Post, id: 3, published: true)

离开和关闭

此模块还提供用于模拟离开和关闭通道的函数。 一旦您离开或关闭通道,由于通道在加入时与测试进程链接,它将使测试进程崩溃

leave(socket)
** (EXIT from #PID<...>) {:shutdown, :leave}

您可以通过在测试中取消链接通道进程来避免这种情况

Process.unlink(socket.channel_pid)

注意 leave/1 是异步的,因此它还将返回一个引用,您可以使用它来检查回复

ref = leave(socket)
assert_reply ref, :ok

另一方面,关闭总是同步的,它只在通道进程保证已终止后才会返回

:ok = close(socket)

这模仿了客户端中存在的行为。

要断言您的通道异步关闭或出错,您可以使用 Elixir 提供的工具监控通道进程,并等待 :DOWN 消息。 想象一下 handle_info/2 函数的实现,该函数在收到 :some_message 时关闭通道

def handle_info(:some_message, socket) do
  {:stop, :normal, socket}
end

在您的测试中,您可以断言关闭已发生

Process.monitor(socket.channel_pid)
send(socket.channel_pid, :some_message)
assert_receive {:DOWN, _, _, _, :normal}

摘要

函数

断言通道在 timeout 内向客户端推送了具有给定事件和有效负载的消息。

从 pid 广播套接字主题的所有订阅者的事件。

broadcast_from/3 相同,但如果广播失败则会抛出异常。

模拟客户端关闭套接字。

为套接字处理程序启动传输连接。

加入给定主题和有效负载下的通道。

模拟客户端离开通道。

将消息推送到通道中。

断言通道在 timeout 内没有向客户端推送与给定事件和有效负载匹配的消息。

为给定的 socket_module 构建一个套接字。

为给定的 socket_module 构建一个套接字,使用给定的 id 和分配。

订阅给定主题,并在给定主题和有效负载下加入通道。

subscribe_and_join/4 相同,但返回套接字或抛出错误。

函数

链接到此宏

assert_broadcast(event, payload, timeout \\ Application.fetch_env!(:ex_unit, :assert_receive_timeout))

查看源代码 (宏)

断言通道在 timeout 内广播了消息。

在断言任何内容被广播之前,我们必须首先在测试进程中订阅通道的主题

@endpoint.subscribe("foo:ok")

现在我们可以将事件和有效负载匹配为模式

assert_broadcast "some_event", %{"data" => _}

在上面的断言中,我们并不特别关心发送的数据,只要发送了任何数据即可。

超时以毫秒为单位,默认值为 :ex_unit 应用程序上设置的 :assert_receive_timeout(默认值为 100 毫秒)。

链接到此宏

assert_push(event, payload, timeout \\ Application.fetch_env!(:ex_unit, :assert_receive_timeout))

查看源代码 (宏)

断言通道在 timeout 内向客户端推送了具有给定事件和有效负载的消息。

注意事件和有效负载是模式。 这意味着人们可以编写

assert_push "some_event", %{"data" => _}

在上面的断言中,我们并不特别关心发送的数据,只要发送了任何数据即可。

超时以毫秒为单位,默认值为 :ex_unit 应用程序上设置的 :assert_receive_timeout(默认值为 100 毫秒)。

**注意:** 由于事件和有效负载是模式,因此它们将被匹配。 这意味着,如果您希望断言收到的有效负载等效于现有变量,则需要在断言表达式中固定该变量。

好的

expected_payload = %{foo: "bar"}
assert_push "some_event", ^expected_payload

不好的

expected_payload = %{foo: "bar"}
assert_push "some_event", expected_payload
# The code above does not assert the payload matches the described map.
链接到此宏

assert_reply(ref, status, payload \\ Macro.escape(%{}), timeout \\ Application.fetch_env!(:ex_unit, :assert_receive_timeout))

查看源代码 (宏)

断言通道在 timeout 内回复了给定消息。

注意状态和有效负载是模式。 这意味着人们可以编写

ref = push(channel, "some_event")
assert_reply ref, :ok, %{"data" => _}

在上面的断言中,我们并不特别关心发送的数据,只要发送了任何回复即可。

超时以毫秒为单位,默认值为 :ex_unit 应用程序上设置的 :assert_receive_timeout(默认值为 100 毫秒)。

链接到此函数

broadcast_from(socket, event, message)

查看源代码

从 pid 广播套接字主题的所有订阅者的事件。

测试进程将不会收到发布的消息。 这将触发通道中的 handle_out/3 回调。

示例

iex> broadcast_from(socket, "new_message", %{id: 1, content: "hello"})
:ok
链接到此函数

broadcast_from!(socket, event, message)

查看源代码

broadcast_from/3 相同,但如果广播失败则会抛出异常。

链接到此函数

close(socket, timeout \\ 5000)

查看源代码

模拟客户端关闭套接字。

关闭套接字是同步的,默认超时为 5000 毫秒。

链接到此宏

connect(handler, params, options \\ quote do [] end)

查看源代码 (宏)

为套接字处理程序启动传输连接。

对于测试 UserSocket 身份验证很有用。 返回处理程序的 connect/3 回调的结果。

参见 join/4

链接到此函数

join(socket, topic, payload)

查看源代码

参见 join/4

链接到此函数

join(socket, channel, topic, payload \\ %{})

查看源代码

加入给定主题和有效负载下的通道。

给定的通道在与测试进程链接的单独进程中加入。

它返回 {:ok, reply, socket}{:error, reply}

@spec leave(Phoenix.Socket.t()) :: reference()

模拟客户端离开通道。

链接到此函数

push(socket, event, payload \\ %{})

查看源代码
@spec push(Phoenix.Socket.t(), String.t(), map()) :: reference()

将消息推送到通道中。

这将触发通道中的 handle_in/3 回调。

示例

iex> push(socket, "new_message", %{id: 1, content: "hello"})
reference
链接到此宏

refute_broadcast(event, payload, timeout \\ Application.fetch_env!(:ex_unit, :refute_receive_timeout))

查看源代码 (宏)

断言通道在 timeout 内没有广播消息。

assert_broadcast 一样,事件和有效负载是模式。

超时以毫秒为单位,默认值为 :ex_unit 应用程序上设置的 :refute_receive_timeout(默认值为 100 毫秒)。 请记住,此宏将按超时值阻塞测试,因此仅在必要时使用它,因为过度使用肯定会减慢您的测试套件速度。

链接到此宏

refute_push(event, payload, timeout \\ Application.fetch_env!(:ex_unit, :refute_receive_timeout))

查看源代码 (宏)

断言通道在 timeout 内没有向客户端推送与给定事件和有效负载匹配的消息。

assert_push 一样,事件和有效负载是模式。

超时以毫秒为单位,默认值为 :ex_unit 应用程序上设置的 :refute_receive_timeout(默认值为 100 毫秒)。 请记住,此宏将按超时值阻塞测试,因此仅在必要时使用它,因为过度使用肯定会减慢您的测试套件速度。

链接到此宏

refute_reply(ref, status, payload \\ Macro.escape(%{}), timeout \\ Application.fetch_env!(:ex_unit, :refute_receive_timeout))

查看源代码 (宏)

断言通道在 timeout 内没有回复与匹配的有效负载。

assert_reply 一样,事件和有效负载是模式。

超时以毫秒为单位,默认值为 :ex_unit 应用程序上设置的 :refute_receive_timeout(默认值为 100 毫秒)。 请记住,此宏将按超时值阻塞测试,因此仅在必要时使用它,因为过度使用肯定会减慢您的测试套件速度。

链接到此宏

socket(socket_module)

查看源代码 (宏)

为给定的 socket_module 构建一个套接字。

然后,套接字用于订阅和加入频道。当您想要创建一个空白套接字并传递给诸如 UserSocket.connect/3 之类的函数时,请使用此函数。

否则,如果您想使用现有 ID 和分配来构建套接字,请使用 socket/4

示例

socket(MyApp.UserSocket)
链接到此宏

socket(socket_module, socket_id, socket_assigns, options \\ [])

查看源代码 (宏)

为给定的 socket_module 构建一个套接字,使用给定的 id 和分配。

示例

socket(MyApp.UserSocket, "user_id", %{some: :assign})

如果您需要在测试进程以外的另一个进程中访问套接字,可以在第四个参数中提供测试进程的 pid

示例

test "connect in a task" do
  pid = self()
  task = Task.async(fn -> 
    socket = socket(MyApp.UserSocket, "user_id", %{some: :assign}, test_process: pid)
    broadcast_from!(socket, "default", %{"foo" => "bar"})
    assert_push "default", %{"foo" => "bar"}
  end)
  Task.await(task)
end
链接到此函数

subscribe_and_join(socket, topic)

查看源代码

参见 subscribe_and_join/4

链接到此函数

subscribe_and_join(socket, topic, payload)

查看源代码

参见 subscribe_and_join/4

链接到此函数

subscribe_and_join(socket, channel, topic, payload \\ %{})

查看源代码

订阅给定主题,并在给定主题和有效负载下加入通道。

通过订阅主题,我们可以使用 assert_broadcast/3 来验证消息是否已通过发布-订阅层发送。

通过加入频道,我们可以直接与之交互。给定的频道在一个与测试进程关联的单独进程中加入。

如果没有提供频道模块,套接字的处理程序将用于查找与给定主题匹配的频道。

它返回 {:ok, reply, socket}{:error, reply}

链接到此函数

subscribe_and_join!(socket, topic)

查看源代码

参见 subscribe_and_join!/4

链接到此函数

subscribe_and_join!(socket, topic, payload)

查看源代码

参见 subscribe_and_join!/4

链接到此函数

subscribe_and_join!(socket, channel, topic, payload \\ %{})

查看源代码

subscribe_and_join/4 相同,但返回套接字或抛出错误。

当您没有测试加入频道并且只需要套接字时,这很有用。