查看源代码 Task (Elixir v1.16.2)
用于生成和等待任务的便捷方法。
任务是指旨在在其整个生命周期中执行特定操作的进程,通常很少或根本不与其他进程通信。任务最常见的用例是将顺序代码转换为并发代码,方法是异步计算值。
task = Task.async(fn -> do_some_work() end)
res = do_some_other_work()
res + Task.await(task)
使用 async
生成的任务可以通过其调用者进程(且仅限于其调用者)等待,如上面的示例所示。它们的实现方法是生成一个进程,该进程在执行完给定的计算后会向调用者发送消息。
与使用 spawn/1
启动的普通进程相比,任务包含监视元数据和错误时的日志记录。
除了 async/1
和 await/2
之外,任务也可以作为监督树的一部分启动,并在远程节点上动态生成。我们将在下一节中探讨这些场景。
async 和 await
任务的常见用途之一是使用 Task.async/1
将顺序代码转换为并发代码,同时保留其语义。调用时,会创建一个新的进程,由调用者链接和监视。任务操作完成后,会向调用者发送一条包含结果的消息。
Task.await/2
用于读取任务发送的消息。
在使用 async
时,需要注意两点:
如果您使用异步任务,则 **必须等待** 响应,因为它们 **总是** 会发送。如果您不期望收到响应,请考虑使用
Task.start_link/1
,如下所述。异步任务会将调用者进程和生成的进程链接在一起。这意味着如果调用者崩溃,任务也会崩溃,反之亦然。这样做是为了:如果用于接收结果的进程不再存在,则完成计算没有任何意义。如果不需要这样做,您将需要使用监督任务,这将在下一节中介绍。
任务是进程
任务是进程,因此数据需要完全复制到它们。以以下代码为例:
large_data = fetch_large_data()
task = Task.async(fn -> do_some_work(large_data) end)
res = do_some_other_work()
res + Task.await(task)
上面的代码复制了所有 large_data
,这可能很消耗资源,具体取决于数据的规模。有两种方法可以解决这个问题。
首先,如果您只需要访问 large_data
的一部分,请在创建任务之前将其提取出来:
large_data = fetch_large_data()
subset_data = large_data.some_field
task = Task.async(fn -> do_some_work(subset_data) end)
或者,如果您可以在任务中完全移动数据加载,这可能更好:
task = Task.async(fn ->
large_data = fetch_large_data()
do_some_work(large_data)
end)
动态监督任务
Task.Supervisor
模块允许开发人员动态创建多个监督任务。
一个简短的示例如下:
{:ok, pid} = Task.Supervisor.start_link()
task =
Task.Supervisor.async(pid, fn ->
# Do something
end)
Task.await(task)
但是,在大多数情况下,您需要将任务主管添加到您的监督树中:
Supervisor.start_link([
{Task.Supervisor, name: MyApp.TaskSupervisor}
], strategy: :one_for_one)
现在,您可以通过传递主管的名称而不是 pid 来使用 async/await:
Task.Supervisor.async(MyApp.TaskSupervisor, fn ->
# Do something
end)
|> Task.await()
我们鼓励开发人员尽可能依赖监督任务。监督任务提高了对当前运行的任务数量的可见性,并支持各种模式,使您能够明确控制如何处理结果、错误和超时。以下是总结:
使用
Task.Supervisor.start_child/2
允许您启动一个启动即忘的任务,如果您不关心它的结果或它是否成功完成。使用
Task.Supervisor.async/2
+Task.await/2
允许您并发执行任务并检索其结果。如果任务失败,调用者也会失败。使用
Task.Supervisor.async_nolink/2
+Task.yield/2
+Task.shutdown/2
允许您并发执行任务并在给定的时间范围内检索其结果或失败的原因。如果任务失败,调用者不会失败。您将在yield
或shutdown
上收到错误原因。
此外,主管保证所有任务在您的应用程序关闭时在可配置的关闭时间内终止。有关支持的操作的详细信息,请参阅 Task.Supervisor
模块。
分布式任务
使用 Task.Supervisor
,可以轻松地在节点之间动态启动任务:
# On the remote node named :remote@local
Task.Supervisor.start_link(name: MyApp.DistSupervisor)
# On the client
supervisor = {MyApp.DistSupervisor, :remote@local}
Task.Supervisor.async(supervisor, MyMod, :my_fun, [arg1, arg2, arg3])
请注意,在处理分布式任务时,应该使用 Task.Supervisor.async/5
函数,该函数需要显式模块、函数和参数,而不是 Task.Supervisor.async/3
函数,该函数使用匿名函数。这是因为匿名函数期望所有参与节点上存在相同的模块版本。有关分布式进程的更多信息,请检查 Agent
模块文档,因为其中描述的限制适用于整个生态系统。
静态监督任务
Task
模块实现了 child_spec/1
函数,该函数允许它直接在常规 Supervisor
(而不是 Task.Supervisor
)下启动,方法是传递包含要运行的函数的元组:
Supervisor.start_link([
{Task, fn -> :some_work end}
], strategy: :one_for_one)
这在您需要在设置监督树时执行某些步骤时非常有用。例如:预热缓存、记录初始化状态等。
如果您不想将任务代码直接放在 Supervisor
下,可以将 Task
包装在它自己的模块中,类似于您对 GenServer
或 Agent
的操作:
defmodule MyTask do
use Task
def start_link(arg) do
Task.start_link(__MODULE__, :run, [arg])
end
def run(arg) do
# ...
end
end
然后将其传递给主管:
Supervisor.start_link([
{MyTask, arg}
], strategy: :one_for_one)
由于这些任务是监督的,而不是直接与调用者链接,因此无法等待它们。默认情况下,Task.start/1
和 Task.start_link/1
函数用于启动即忘的任务,您不关心结果或它是否成功完成。
使用 Task
当您使用
use Task
时,Task
模块会定义一个child_spec/1
函数,因此您的模块可以用作监督树中的子节点。
use Task
定义一个 child_spec/1
函数,允许将定义的模块放入监督树中。生成的 child_spec/1
可以使用以下选项进行自定义:
:id
- 子节点规范标识符,默认为当前模块:restart
- 子节点何时应该重新启动,默认为:temporary
:shutdown
- 如何关闭子节点,立即关闭还是给它时间关闭
与 GenServer
、Agent
和 Supervisor
相反,Task 的默认 :restart
为 :temporary
。这意味着即使任务崩溃,它也不会重新启动。如果您希望任务在非成功退出时重新启动,请执行以下操作:
use Task, restart: :transient
如果您希望任务始终重新启动:
use Task, restart: :permanent
有关更详细的信息,请参阅 Supervisor
模块中的“子节点规范”部分。紧接在 use Task
之前的 @doc
注释将附加到生成的 child_spec/1
函数。
祖先和调用者跟踪
每当您启动一个新的进程时,Elixir 会通过进程字典中的 $ancestors
键来注释该进程的父进程。这通常用于跟踪监督树内的层次结构。
例如,我们建议开发人员始终在主管下启动任务。这提供了更高的可见性,并允许您控制在节点关闭时如何终止这些任务。这可能看起来像 Task.Supervisor.start_child(MySupervisor, task_function)
。这意味着,虽然您的代码是调用任务的代码,但任务的实际祖先是主管,因为主管是实际启动它的进程。
为了跟踪您的代码和任务之间的关系,我们使用进程字典中的 $callers
键。因此,假设上面有 Task.Supervisor
调用,我们有:
[your code] -- calls --> [supervisor] ---- spawns --> [task]
这意味着我们存储了以下关系:
[your code] [supervisor] <-- ancestor -- [task]
^ |
|--------------------- caller ---------------------|
可以使用 Process.get(:"$callers")
从进程字典中检索当前进程的调用者列表。这将返回 nil
或至少包含一个条目的列表 [pid_n, ..., pid2, pid1]
,其中 pid_n
是调用当前进程的 PID,pid2
调用了 pid_n
,而 pid2
被 pid1
调用。
如果任务崩溃,调用者字段将作为 :callers
键下的日志消息元数据的一部分包含在内。
总结
函数
Task 结构体。
启动一个必须等待的任务。
启动一个必须等待的任务。
返回一个流,该流在 enumerable
中的每个元素上并发运行给定的函数 fun
。
返回一个流,其中给定的函数(module
和 function_name
)并发映射在 enumerable
中的每个元素上。
等待任务响应并返回它。
等待来自多个任务的响应并返回它们。
返回在主管下启动任务的规范。
启动一个立即使用给定的 result
完成的任务。
忽略现有任务。
取消链接并关闭任务,然后检查响应。
启动一个任务。
将任务作为监督树的一部分启动,使用给定的 fun
。
将任务作为监督树的一部分启动,使用给定的 module
、function
和 args
。
临时阻塞调用者进程,等待任务响应。
在给定的时间间隔内,对多个任务进行 yield 操作。
类型
函数
Task 结构体。
它包含以下字段
启动一个必须等待的任务。
fun
必须是一个零元匿名函数。此函数会生成一个与调用方进程链接并进行监控的进程。返回一个包含相关信息的 Task
结构体。
如果您启动一个 async
,您 **必须等待**。这可以通过调用 Task.await/2
或 Task.yield/2
,然后在返回的任务上调用 Task.shutdown/2
来实现。或者,如果您在 GenServer
内部生成一个任务,那么 GenServer
会自动为您等待,并使用任务响应和关联的 :DOWN
消息调用 GenServer.handle_info/2
。
有关异步任务的通用使用方法的更多信息,请阅读 Task
模块文档。
链接
此函数会生成一个与调用方进程链接并进行监控的进程。链接部分很重要,因为如果父进程死亡,它会中止任务。它还保证在添加异步调用后,异步/等待之前的代码具有相同的属性。例如,假设您有以下代码
x = heavy_fun()
y = some_fun()
x + y
现在您想使 heavy_fun()
异步
x = Task.async(&heavy_fun/0)
y = some_fun()
Task.await(x) + y
与之前一样,如果 heavy_fun/0
失败,整个计算都会失败,包括调用方进程。如果您不希望任务失败,那么您必须以与没有异步调用时相同的方式更改 heavy_fun/0
代码。例如,要么返回 {:ok, val} | :error
结果,要么,在更极端的情况下,使用 try/rescue
。换句话说,异步任务应该被认为是调用方进程的扩展,而不是将其与所有错误隔离的机制。
如果您不想将调用方链接到任务,那么您必须使用带有 Task.Supervisor
的受监督的任务,并调用 Task.Supervisor.async_nolink/2
。
无论如何,请避免以下任何操作
将
:trap_exit
设置为true
- 捕获退出应该仅在特殊情况下使用,因为它会使您的进程不仅对任务的退出免疫,还会对其他任何进程的退出免疫。此外,即使在捕获退出时,调用
await
仍然会在任务在未发送其结果的情况下终止时退出。取消与使用
async
/await
启动的任务进程的链接。如果您取消了进程的链接,而任务不属于任何主管,那么在调用方进程死亡的情况下,您可能会留下悬挂的任务。
元数据
使用此函数创建的任务在其 :mfa
元数据字段中存储 :erlang.apply/2
,它在内部用于应用匿名函数。如果您希望使用另一个函数作为元数据,请使用 async/3
。
启动一个必须等待的任务。
类似于 async/1
,只是要启动的函数由给定的 module
、function_name
和 args
指定。module
、function_name
及其元组存储在 :mfa
字段中,用于反射目的。
@spec async_stream(Enumerable.t(), (term() -> term()), keyword()) :: Enumerable.t()
返回一个流,该流在 enumerable
中的每个元素上并发运行给定的函数 fun
。
与 async_stream/5
的工作方式相同,但使用匿名函数而不是模块-函数-参数元组。 fun
必须是一个一元匿名函数。
每个 enumerable
元素都作为参数传递给给定的函数 fun
,并由其自己的任务处理。与 async/1
类似,这些任务将与调用方进程链接。
示例
异步计算每个字符串中的代码点,然后使用 reduce 将这些计数加在一起。
iex> strings = ["long string", "longer string", "there are many of these"]
iex> stream = Task.async_stream(strings, fn text -> text |> String.codepoints() |> Enum.count() end)
iex> Enum.reduce(stream, 0, fn {:ok, num}, acc -> num + acc end)
47
有关讨论、选项和更多示例,请参见 async_stream/5
。
@spec async_stream(Enumerable.t(), module(), atom(), [term()], keyword()) :: Enumerable.t()
返回一个流,其中给定的函数(module
和 function_name
)并发映射在 enumerable
中的每个元素上。
每个 enumerable
元素都将附加到给定的 args
并由其自己的任务处理。这些任务将与一个中间进程链接,该进程随后与调用方进程链接。这意味着任务中的失败会终止调用方进程,而调用方进程中的失败会终止所有任务。
在流式传输时,每个任务都会在成功完成时发出 {:ok, value}
,如果调用方正在捕获退出,则会发出 {:exit, reason}
。如果使用 :zip_input_on_exit
选项退出,则可能出现 {:exit, {element, reason}}
。结果的顺序取决于 :ordered
选项的值。
可以通过选项控制并发级别和任务允许运行的时间(请参见下面的“选项”部分)。
考虑使用 Task.Supervisor.async_stream/6
在主管下启动任务。如果您发现自己正在捕获退出以确保任务中的错误不会终止调用方进程,请考虑使用 Task.Supervisor.async_stream_nolink/6
启动不与调用方进程链接的任务。
选项
:max_concurrency
- 设置同时运行的任务的最大数量。默认为System.schedulers_online/0
。:ordered
- 结果是否应该以与输入流相同的顺序返回。当输出有序时,Elixir 可能需要缓冲结果以按原始顺序发出。将此选项设置为 false 将禁用缓冲的需要,但代价是移除排序。这在您仅将任务用于副作用时也很有用。请注意,无论:ordered
设置为什么,任务都将异步处理。如果您需要按顺序处理元素,请考虑使用Enum.map/2
或Enum.each/2
。默认为true
。:timeout
- 每个任务允许执行的最大时间(以毫秒或:infinity
为单位)。默认为5000
。:on_timeout
- 任务超时时该怎么办。可能的值为:exit
(默认) - 调用方(生成任务的进程)退出。:kill_task
- 超时的任务被杀死。该任务发出的值为{:exit, :timeout}
。
:zip_input_on_exit
- (自 v1.14.0 起) 将原始输入添加到:exit
元组中。该任务发出的值为{:exit, {input, reason}}
,其中input
是导致处理期间退出的集合元素。默认为false
。
示例
让我们构建一个流,然后对其进行枚举
stream = Task.async_stream(collection, Mod, :expensive_fun, [])
Enum.to_list(stream)
可以使用 :max_concurrency
选项增加或减少并发性。例如,如果任务是 I/O 密集型,则可以增加其值
max_concurrency = System.schedulers_online() * 2
stream = Task.async_stream(collection, Mod, :expensive_fun, [], max_concurrency: max_concurrency)
Enum.to_list(stream)
如果您不关心计算结果,可以使用 Stream.run/1
运行流。还要设置 ordered: false
,因为您也不关心结果的顺序
stream = Task.async_stream(collection, Mod, :expensive_fun, [], ordered: false)
Stream.run(stream)
先完成的异步任务
您也可以使用 async_stream/3
执行 M 个任务并找到先完成的 N 个任务。例如
[
&heavy_call_1/0,
&heavy_call_2/0,
&heavy_call_3/0
]
|> Task.async_stream(fn fun -> fun.() end, ordered: false, max_concurrency: 3)
|> Stream.filter(&match?({:ok, _}, &1))
|> Enum.take(2)
在上面的示例中,我们正在执行三个任务,并等待前两个完成。我们使用 Stream.filter/2
将自己限制为仅成功完成的任务,然后使用 Enum.take/2
检索 N 项。请注意,设置 ordered: false
和 max_concurrency: M
很重要,其中 M 是任务的数量,以确保所有调用都同时执行。
注意:无界异步 + take
如果您希望处理大量项目并仅保留部分结果,则最终可能会处理比预期更多的项目。让我们看一个例子
1..100
|> Task.async_stream(fn i ->
Process.sleep(100)
IO.puts(to_string(i))
end)
|> Enum.take(10)
在具有 8 个内核的机器上运行上面的示例将处理 16 个项目,即使您只想要 10 个元素,因为 async_stream/3
会同时处理项目。这是因为一次会处理 8 个元素。然后所有 8 个元素几乎同时完成,导致 8 个元素被启动以进行处理。在这额外的 8 个元素中,只有 2 个会被使用,其余的会被终止。
根据问题,您可以在前端过滤或限制元素的数量
1..100
|> Stream.take(10)
|> Task.async_stream(fn i ->
Process.sleep(100)
IO.puts(to_string(i))
end)
|> Enum.to_list()
在其他情况下,您可能希望调整 :max_concurrency
以限制可以同时处理的元素数量,代价是降低并发性。您也可以将要获取的元素数量设置为 :max_concurrency
的倍数。例如,在上面的示例中设置 max_concurrency: 5
。
等待任务响应并返回它。
如果任务进程死亡,调用者进程将以与任务相同的理由退出。
可以使用毫秒或 :infinity
来设置超时时间,默认值为 5000
。如果超时,调用者进程将退出。如果任务进程与调用者进程链接,当任务使用 async
启动时就会发生这种情况,则任务进程也会退出。如果任务进程正在捕获退出或未与调用者进程链接,则它将继续运行。
此函数假设任务的监视器仍然处于活动状态,或者监视器的 :DOWN
消息位于消息队列中。如果它已被取消监视,或者消息已接收,此函数将等待超时持续时间,以等待消息。
对于任何给定的任务,此函数只能调用一次。如果您希望能够多次检查长时间运行的任务是否已完成计算,请使用 yield/2
。
示例
iex> task = Task.async(fn -> 1 + 1 end)
iex> Task.await(task)
2
与 OTP 行为的兼容性
不建议在 OTP 行为(如 GenServer
)内 await
长时间运行的任务。相反,您应该在您的 GenServer.handle_info/2
回调中匹配来自任务的消息。
GenServer 将在 handle_info/2
上接收两条消息
{ref, result}
- 响应消息,其中ref
是task.ref
返回的监视器引用,result
是任务结果{:DOWN, ref, :process, pid, reason}
- 由于所有任务也都受到监视,您还将收到由Process.monitor/1
传递的:DOWN
消息。如果您在没有回复的情况下收到:DOWN
消息,则意味着任务崩溃了
另一个需要牢记的是,由 Task.async/1
启动的任务始终与其调用者链接,您可能不希望 GenServer 在任务崩溃时崩溃。因此,最好在 OTP 行为中使用 Task.Supervisor.async_nolink/3
。为了完整起见,以下是一个启动任务并处理其结果的 GenServer 示例
defmodule GenServerTaskExample do
use GenServer
def start_link(opts) do
GenServer.start_link(__MODULE__, :ok, opts)
end
def init(_opts) do
# We will keep all running tasks in a map
{:ok, %{tasks: %{}}}
end
# Imagine we invoke a task from the GenServer to access a URL...
def handle_call(:some_message, _from, state) do
url = ...
task = Task.Supervisor.async_nolink(MyApp.TaskSupervisor, fn -> fetch_url(url) end)
# After we start the task, we store its reference and the url it is fetching
state = put_in(state.tasks[task.ref], url)
{:reply, :ok, state}
end
# If the task succeeds...
def handle_info({ref, result}, state) do
# The task succeed so we can demonitor its reference
Process.demonitor(ref, [:flush])
{url, state} = pop_in(state.tasks[ref])
IO.puts "Got #{inspect(result)} for URL #{inspect url}"
{:noreply, state}
end
# If the task fails...
def handle_info({:DOWN, ref, _, _, reason}, state) do
{url, state} = pop_in(state.tasks[ref])
IO.puts "URL #{inspect url} failed with reason #{inspect(reason)}"
{:noreply, state}
end
end
定义了服务器后,您需要在您的监督树中启动上面的任务监督器和 GenServer
children = [
{Task.Supervisor, name: MyApp.TaskSupervisor},
{GenServerTaskExample, name: MyApp.GenServerTaskExample}
]
Supervisor.start_link(children, strategy: :one_for_one)
等待来自多个任务的响应并返回它们。
此函数接收任务列表,并在给定的时间间隔内等待它们的回复。它返回结果列表,顺序与 tasks
输入参数中提供的任务相同。
如果任何任务进程死亡,调用者进程将以与该任务相同的理由退出。
可以使用毫秒或 :infinity
来设置超时时间,默认值为 5000
。如果超时,调用者进程将退出。任何与调用者进程链接的任务进程(当使用 async
启动任务时就会发生这种情况)也将退出。任何正在捕获退出或未与调用者进程链接的任务进程将继续运行。
此函数假设任务的监视器仍然处于活动状态,或者监视器的 :DOWN
消息位于消息队列中。如果任何任务已被取消监视,或者消息已接收,此函数将等待超时持续时间。
对于任何给定的任务,此函数只能调用一次。如果您希望能够多次检查长时间运行的任务是否已完成计算,请使用 yield_many/2
。
与 OTP 行为的兼容性
不建议在 OTP 行为(如 GenServer
)内 await
长时间运行的任务。有关更多信息,请参见 await/2
。
示例
iex> tasks = [
...> Task.async(fn -> 1 + 1 end),
...> Task.async(fn -> 2 + 3 end)
...> ]
iex> Task.await_many(tasks)
[2, 5]
@spec child_spec(term()) :: Supervisor.child_spec()
返回在主管下启动任务的规范。
arg
作为参数传递给规范的 :start
字段中的 Task.start_link/1
。
有关更多信息,请参见 Supervisor
模块、Supervisor.child_spec/2
函数和 Supervisor.child_spec/0
类型。
启动一个立即使用给定的 result
完成的任务。
与 async/1
不同,此任务不会生成链接进程。它可以像任何其他任务一样被等待或生成。
用法
在某些情况下,创建表示已运行并生成结果的“已完成”任务很有用。例如,在处理数据时,您可能能够在调度进一步处理之前确定某些输入无效
def process(data) do
tasks =
for entry <- data do
if invalid_input?(entry) do
Task.completed({:error, :invalid_input})
else
Task.async(fn -> further_process(entry) end)
end
end
Task.await_many(tasks)
end
在许多情况下,可以使用 Task.completed/1
来代替直接返回结果。通常,只有在处理混合异步性时才需要此变体,当一组输入将被部分同步和部分异步处理时。
忽略现有任务。
这意味着任务将继续运行,但它将被解除链接,您将无法再生成、等待或关闭它。
如果在忽略任务之前收到回复,则返回 {:ok, reply}
,如果任务在忽略它之前死亡,则返回 {:exit, reason}
,否则返回 nil
。
重要提示:避免使用 Task.async/1,3
,然后立即忽略任务。如果您想启动您不关心其结果的任务,请使用 Task.Supervisor.start_child/2
。
取消链接并关闭任务,然后检查响应。
如果在关闭任务时收到回复,则返回 {:ok, reply}
,如果任务死亡,则返回 {:exit, reason}
,否则返回 nil
。关闭后,您将无法再等待或生成它。
第二个参数是超时时间或 :brutal_kill
。在超时的情况下,会向任务进程发送 :shutdown
退出信号,如果它在超时时间内没有退出,则会将其杀死。使用 :brutal_kill
会立即杀死任务。如果任务异常终止(可能被另一个进程杀死),此函数将以相同的理由退出。
终止调用者时,不需要调用此函数,除非以 :normal
退出或任务正在捕获退出。如果调用者以 :normal
以外的理由退出,并且任务没有捕获退出,则调用者的退出信号将停止任务。调用者可以使用 :shutdown
退出以关闭所有与其链接的进程(包括任务),这些进程未捕获退出,不会生成任何日志消息。
如果没有进程与任务链接,例如由 Task.completed/1
启动的任务,我们将相应地检查响应或错误,但不会关闭进程。
如果任务的监视器已被取消监视或已接收,并且消息队列中没有等待的响应,此函数将返回 {:exit, :noproc}
,因为无法确定结果或退出原因。
启动一个任务。
fun
必须是零元匿名函数。
这应该只在任务用于副作用(如 I/O)并且您对它的结果或是否成功完成没有兴趣时使用。
如果当前节点关闭,即使任务未完成,节点也会终止。出于这个原因,我们建议使用 Task.Supervisor.start_child/2
,它允许您通过 :shutdown
选项控制关闭时间。
启动一个任务。
这应该只在任务用于副作用(如 I/O)并且您对它的结果或是否成功完成没有兴趣时使用。
如果当前节点关闭,即使任务未完成,节点也会终止。出于这个原因,我们建议使用 Task.Supervisor.start_child/2
,它允许您通过 :shutdown
选项控制关闭时间。
将任务作为监督树的一部分启动,使用给定的 fun
。
fun
必须是零元匿名函数。
这是在监督树下启动静态监督任务的用途。
将任务作为监督树的一部分启动,使用给定的 module
、function
和 args
。
这是在监督树下启动静态监督任务的用途。
临时阻塞调用者进程,等待任务响应。
如果收到回复,则返回 {:ok, reply}
,如果未收到回复,则返回 nil
,如果任务已退出,则返回 {:exit, reason}
。请记住,通常情况下,任务失败也会导致拥有该任务的进程退出。因此,如果以下至少一个条件适用,此函数可以返回 {:exit, reason}
- 任务进程以
:normal
退出 - 任务未与调用者链接(任务使用
Task.Supervisor.async_nolink/2
或Task.Supervisor.async_nolink/4
启动) - 调用者正在捕获退出
可以使用毫秒或 :infinity
来设置超时时间,默认值为 5000
。如果在收到来自任务的消息之前时间用完,此函数将返回 nil
,监视器将保持活动状态。因此,可以对同一任务多次调用 yield/2
。
此函数假设任务的监视器仍然处于活动状态,或者监视器的 :DOWN
消息位于消息队列中。如果它已被取消监视或消息已接收,此函数将等待超时持续时间,以等待消息。
如果您打算在任务在 timeout
毫秒内没有响应的情况下关闭它,您应该将此与 shutdown/1
链接起来,如下所示
case Task.yield(task, timeout) || Task.shutdown(task) do
{:ok, result} ->
result
nil ->
Logger.warning("Failed to get a result in #{timeout}ms")
nil
end
如果您打算检查任务,但在超时后让它继续运行,您可以将此与 ignore/1
链接起来,如下所示
case Task.yield(task, timeout) || Task.ignore(task) do
{:ok, result} ->
result
nil ->
Logger.warning("Failed to get a result in #{timeout}ms")
nil
end
这确保了如果任务在 timeout
之后但在调用 shutdown/1
之前完成,您仍然会获得结果,因为 shutdown/1
被设计为处理这种情况并返回结果。
@spec yield_many([t()], timeout()) :: [{t(), {:ok, term()} | {:exit, term()} | nil}]
@spec yield_many([t()], limit: pos_integer(), timeout: timeout(), on_timeout: :nothing | :ignore | :kill_task ) :: [{t(), {:ok, term()} | {:exit, term()} | nil}]
在给定的时间间隔内,对多个任务进行 yield 操作。
此函数接收任务列表,并在给定的时间间隔内等待它们的回复。它返回一个由两个元素元组组成的列表,第一个元素是任务,第二个元素是生成的結果。返回列表中的任务将与 tasks
输入参数中提供的任务顺序相同。
与 yield/2
类似,每个任务的结果将是
{:ok, term}
如果任务在给定的时间间隔内成功报告了结果{:exit, reason}
如果任务已终止nil
如果任务仍在运行,可能是因为已达到限制或超过了超时时间
有关更多信息,请查看 yield/2
。
示例
Task.yield_many/2
允许开发人员生成多个任务并检索在给定时间段内收到的结果。如果将其与 Task.shutdown/2
(或 Task.ignore/1
)结合使用,它允许我们收集这些结果并取消(或忽略)未及时回复的任务。
让我们来看一个例子。
tasks =
for i <- 1..10 do
Task.async(fn ->
Process.sleep(i * 1000)
i
end)
end
tasks_with_results = Task.yield_many(tasks, timeout: 5000)
results =
Enum.map(tasks_with_results, fn {task, res} ->
# Shut down the tasks that did not reply nor exit
res || Task.shutdown(task, :brutal_kill)
end)
# Here we are matching only on {:ok, value} and
# ignoring {:exit, _} (crashed tasks) and `nil` (no replies)
for {:ok, value} <- results do
IO.inspect(value)
end
在上面的示例中,我们创建了休眠 1 到 10 秒并返回其休眠秒数的任务。如果一次执行所有代码,您应该看到 1 到 5 被打印出来,因为这些是在给定时间内回复的任务。所有其他任务都将使用 Task.shutdown/2
调用关闭。
为了方便起见,您可以通过将 :on_timeout
选项指定为 :kill_task
(或 :ignore
)来实现类似于上述的行为。如果您希望在超时时退出调用者进程,请查看 Task.await_many/2
。
选项
第二个参数是超时或选项,默认为此
:limit
- 要等待的最大任务数。如果在超时之前达到限制,则此函数立即返回,而不会触发:on_timeout
行为:timeout
- 每个任务允许执行的最大时间(以毫秒或:infinity
为单位)。默认为5000
。:on_timeout
- 任务超时时该怎么办。可能的值为:nothing
- 不执行任何操作(默认)。这些任务仍可在以后等待、产生、忽略或关闭。:ignore
- 任务的结果将被忽略。:kill_task
- 超时任务将被杀死。