查看源代码 Phoenix.ChannelTest (Phoenix v1.7.14)
用于测试 Phoenix 通道的便利函数。
在通道测试中,我们通过进程通信与通道交互,发送和接收消息。 通常还会订阅通道订阅的相同主题,这样就可以断言是否广播了给定消息。
通道测试
要开始,请在测试用例中定义模块属性 @endpoint
,指向您的应用程序端点。
然后您可以直接创建一个套接字,并使用 subscribe_and_join/4
来订阅主题和通道
{:ok, _, socket} =
socket(UserSocket, "user:id", %{some_assigns: 1})
|> subscribe_and_join(RoomChannel, "room:lobby", %{"id" => 3})
通常,您希望设置与 UserSocket.connect/3
回调设置的相同的 ID 和分配。 或者,您可以使用 connect/3
帮助程序来调用您的 UserSocket.connect/3
回调,并使用套接字 ID 初始化套接字
{:ok, socket} = connect(UserSocket, %{"some" => "params"}, %{})
{:ok, _, socket} = subscribe_and_join(socket, "room:lobby", %{"id" => 3})
调用后,subscribe_and_join/4
将当前测试进程订阅到 “room:lobby” 主题,并在另一个进程中启动一个通道。 它返回 {:ok, reply, socket}
或 {:error, reply}
。
现在,通道有一个代表它将推送到客户端的通信的套接字,我们的测试有一个代表将推送到服务器的通信的套接字。
例如,我们可以使用测试中的 push/3
函数将消息推送到通道(它将调用 handle_in/3
)
push(socket, "my_event", %{"some" => "data"})
同样,我们可以从测试本身广播主题上的消息,该主题由测试和通道都订阅,从而触发通道上的 handle_out/3
broadcast_from(socket, "my_event", %{"some" => "data"})
注意,只有
broadcast_from/3
和broadcast_from!/3
在测试中可用,以避免将广播消息重新发送到测试进程。
虽然上面的函数将数据推送到通道(服务器),但我们可以使用 assert_push/3
来验证通道是否将消息推送到客户端
assert_push "my_event", %{"some" => "data"}
或者甚至断言某件事被广播到 pubsub 中
assert_broadcast "my_event", %{"some" => "data"}
最后,每次将消息推送到通道时,都会返回一个引用。 我们可以使用此引用来断言服务器发送了特定的回复
ref = push(socket, "counter", %{})
assert_reply ref, :ok, %{"counter" => 1}
检查副作用
通常人们可能希望在通道内部执行副作用,例如写入数据库,并在测试期间验证这些副作用。
想象一下通道内部的以下 handle_in/3
def handle_in("publish", %{"id" => id}, socket) do
Repo.get!(Post, id) |> Post.publish() |> Repo.update!()
{:noreply, socket}
end
由于整个通信都是异步的,因此以下测试将非常脆弱
push(socket, "publish", %{"id" => 3})
assert Repo.get_by(Post, id: 3, published: true)
问题是,我们无法保证通道在调用 push/3
后已完成处理我们的消息。 最好的解决方案是在执行任何其他断言之前断言通道发送了回复。 首先更改通道以发送回复
def handle_in("publish", %{"id" => id}, socket) do
Repo.get!(Post, id) |> Post.publish() |> Repo.update!()
{:reply, :ok, socket}
end
然后在测试中预期它们
ref = push(socket, "publish", %{"id" => 3})
assert_reply ref, :ok
assert Repo.get_by(Post, id: 3, published: true)
离开和关闭
此模块还提供用于模拟离开和关闭通道的函数。 一旦您离开或关闭通道,由于通道在加入时与测试进程链接,它将使测试进程崩溃
leave(socket)
** (EXIT from #PID<...>) {:shutdown, :leave}
您可以通过在测试中取消链接通道进程来避免这种情况
Process.unlink(socket.channel_pid)
注意 leave/1
是异步的,因此它还将返回一个引用,您可以使用它来检查回复
ref = leave(socket)
assert_reply ref, :ok
另一方面,关闭总是同步的,它只在通道进程保证已终止后才会返回
:ok = close(socket)
这模仿了客户端中存在的行为。
要断言您的通道异步关闭或出错,您可以使用 Elixir 提供的工具监控通道进程,并等待 :DOWN
消息。 想象一下 handle_info/2
函数的实现,该函数在收到 :some_message
时关闭通道
def handle_info(:some_message, socket) do
{:stop, :normal, socket}
end
在您的测试中,您可以断言关闭已发生
Process.monitor(socket.channel_pid)
send(socket.channel_pid, :some_message)
assert_receive {:DOWN, _, _, _, :normal}
摘要
函数
断言通道在 timeout
内广播了消息。
断言通道在 timeout
内向客户端推送了具有给定事件和有效负载的消息。
断言通道在 timeout
内回复了给定消息。
从 pid 广播套接字主题的所有订阅者的事件。
与 broadcast_from/3
相同,但如果广播失败则会抛出异常。
模拟客户端关闭套接字。
为套接字处理程序启动传输连接。
加入给定主题和有效负载下的通道。
模拟客户端离开通道。
将消息推送到通道中。
断言通道在 timeout
内没有广播消息。
断言通道在 timeout
内没有向客户端推送与给定事件和有效负载匹配的消息。
断言通道在 timeout
内没有回复与匹配的有效负载。
为给定的 socket_module
构建一个套接字。
为给定的 socket_module
构建一个套接字,使用给定的 id 和分配。
订阅给定主题,并在给定主题和有效负载下加入通道。
函数
assert_broadcast(event, payload, timeout \\ Application.fetch_env!(:ex_unit, :assert_receive_timeout))
查看源代码 (宏)断言通道在 timeout
内广播了消息。
在断言任何内容被广播之前,我们必须首先在测试进程中订阅通道的主题
@endpoint.subscribe("foo:ok")
现在我们可以将事件和有效负载匹配为模式
assert_broadcast "some_event", %{"data" => _}
在上面的断言中,我们并不特别关心发送的数据,只要发送了任何数据即可。
超时以毫秒为单位,默认值为 :ex_unit
应用程序上设置的 :assert_receive_timeout
(默认值为 100 毫秒)。
assert_push(event, payload, timeout \\ Application.fetch_env!(:ex_unit, :assert_receive_timeout))
查看源代码 (宏)断言通道在 timeout
内向客户端推送了具有给定事件和有效负载的消息。
注意事件和有效负载是模式。 这意味着人们可以编写
assert_push "some_event", %{"data" => _}
在上面的断言中,我们并不特别关心发送的数据,只要发送了任何数据即可。
超时以毫秒为单位,默认值为 :ex_unit
应用程序上设置的 :assert_receive_timeout
(默认值为 100 毫秒)。
**注意:** 由于事件和有效负载是模式,因此它们将被匹配。 这意味着,如果您希望断言收到的有效负载等效于现有变量,则需要在断言表达式中固定该变量。
好的
expected_payload = %{foo: "bar"}
assert_push "some_event", ^expected_payload
不好的
expected_payload = %{foo: "bar"}
assert_push "some_event", expected_payload
# The code above does not assert the payload matches the described map.
assert_reply(ref, status, payload \\ Macro.escape(%{}), timeout \\ Application.fetch_env!(:ex_unit, :assert_receive_timeout))
查看源代码 (宏)断言通道在 timeout
内回复了给定消息。
注意状态和有效负载是模式。 这意味着人们可以编写
ref = push(channel, "some_event")
assert_reply ref, :ok, %{"data" => _}
在上面的断言中,我们并不特别关心发送的数据,只要发送了任何回复即可。
超时以毫秒为单位,默认值为 :ex_unit
应用程序上设置的 :assert_receive_timeout
(默认值为 100 毫秒)。
从 pid 广播套接字主题的所有订阅者的事件。
测试进程将不会收到发布的消息。 这将触发通道中的 handle_out/3
回调。
示例
iex> broadcast_from(socket, "new_message", %{id: 1, content: "hello"})
:ok
与 broadcast_from/3
相同,但如果广播失败则会抛出异常。
模拟客户端关闭套接字。
关闭套接字是同步的,默认超时为 5000 毫秒。
为套接字处理程序启动传输连接。
对于测试 UserSocket 身份验证很有用。 返回处理程序的 connect/3
回调的结果。
参见 join/4
。
参见 join/4
。
加入给定主题和有效负载下的通道。
给定的通道在与测试进程链接的单独进程中加入。
它返回 {:ok, reply, socket}
或 {:error, reply}
。
@spec leave(Phoenix.Socket.t()) :: reference()
模拟客户端离开通道。
@spec push(Phoenix.Socket.t(), String.t(), map()) :: reference()
将消息推送到通道中。
这将触发通道中的 handle_in/3
回调。
示例
iex> push(socket, "new_message", %{id: 1, content: "hello"})
reference
refute_broadcast(event, payload, timeout \\ Application.fetch_env!(:ex_unit, :refute_receive_timeout))
查看源代码 (宏)断言通道在 timeout
内没有广播消息。
与 assert_broadcast
一样,事件和有效负载是模式。
超时以毫秒为单位,默认值为 :ex_unit
应用程序上设置的 :refute_receive_timeout
(默认值为 100 毫秒)。 请记住,此宏将按超时值阻塞测试,因此仅在必要时使用它,因为过度使用肯定会减慢您的测试套件速度。
refute_push(event, payload, timeout \\ Application.fetch_env!(:ex_unit, :refute_receive_timeout))
查看源代码 (宏)断言通道在 timeout
内没有向客户端推送与给定事件和有效负载匹配的消息。
与 assert_push
一样,事件和有效负载是模式。
超时以毫秒为单位,默认值为 :ex_unit
应用程序上设置的 :refute_receive_timeout
(默认值为 100 毫秒)。 请记住,此宏将按超时值阻塞测试,因此仅在必要时使用它,因为过度使用肯定会减慢您的测试套件速度。
refute_reply(ref, status, payload \\ Macro.escape(%{}), timeout \\ Application.fetch_env!(:ex_unit, :refute_receive_timeout))
查看源代码 (宏)断言通道在 timeout
内没有回复与匹配的有效负载。
与 assert_reply
一样,事件和有效负载是模式。
超时以毫秒为单位,默认值为 :ex_unit
应用程序上设置的 :refute_receive_timeout
(默认值为 100 毫秒)。 请记住,此宏将按超时值阻塞测试,因此仅在必要时使用它,因为过度使用肯定会减慢您的测试套件速度。
为给定的 socket_module
构建一个套接字。
然后,套接字用于订阅和加入频道。当您想要创建一个空白套接字并传递给诸如 UserSocket.connect/3
之类的函数时,请使用此函数。
否则,如果您想使用现有 ID 和分配来构建套接字,请使用 socket/4
。
示例
socket(MyApp.UserSocket)
为给定的 socket_module
构建一个套接字,使用给定的 id 和分配。
示例
socket(MyApp.UserSocket, "user_id", %{some: :assign})
如果您需要在测试进程以外的另一个进程中访问套接字,可以在第四个参数中提供测试进程的 pid
。
示例
test "connect in a task" do
pid = self()
task = Task.async(fn ->
socket = socket(MyApp.UserSocket, "user_id", %{some: :assign}, test_process: pid)
broadcast_from!(socket, "default", %{"foo" => "bar"})
assert_push "default", %{"foo" => "bar"}
end)
Task.await(task)
end
订阅给定主题,并在给定主题和有效负载下加入通道。
通过订阅主题,我们可以使用 assert_broadcast/3
来验证消息是否已通过发布-订阅层发送。
通过加入频道,我们可以直接与之交互。给定的频道在一个与测试进程关联的单独进程中加入。
如果没有提供频道模块,套接字的处理程序将用于查找与给定主题匹配的频道。
它返回 {:ok, reply, socket}
或 {:error, reply}
。
与 subscribe_and_join/4
相同,但返回套接字或抛出错误。
当您没有测试加入频道并且只需要套接字时,这很有用。