查看源代码 Stream (Elixir v1.16.2)
用于创建和组合流的函数。
流是可组合的、延迟的枚举器(有关枚举器的介绍,请参阅 Enum
模块)。任何在枚举期间逐个生成元素的枚举器都称为流。例如,Elixir 的 Range
是一个流
iex> range = 1..5
1..5
iex> Enum.map(range, &(&1 * 2))
[2, 4, 6, 8, 10]
在上面的示例中,当我们在范围内进行映射时,枚举的元素是在枚举期间逐个创建的。 Stream
模块允许我们映射范围,而不会触发其枚举
iex> range = 1..3
iex> stream = Stream.map(range, &(&1 * 2))
iex> Enum.map(stream, &(&1 + 1))
[3, 5, 7]
请注意,我们从一个范围开始,然后我们创建了一个流,该流旨在将范围中的每个元素乘以 2。此时,没有进行任何计算。只有当 Enum.map/2
被调用时,我们才会真正枚举范围中的每个元素,将它乘以 2 并加上 1。我们说 Stream
中的函数是延迟的,而 Enum
中的函数是急切的。
由于它们的延迟性,流在处理大型(甚至无限的)集合时非常有用。当使用 Enum
链多个操作时,会创建中间列表,而 Stream
创建一个计算配方,这些计算将在稍后执行。让我们看另一个例子
1..3
|> Enum.map(&IO.inspect(&1))
|> Enum.map(&(&1 * 2))
|> Enum.map(&IO.inspect(&1))
1
2
3
2
4
6
#=> [2, 4, 6]
请注意,我们首先打印了列表中的每个元素,然后将每个元素乘以 2,最后打印了每个新值。在这个例子中,列表被枚举了三次。让我们看看流的例子
stream = 1..3
|> Stream.map(&IO.inspect(&1))
|> Stream.map(&(&1 * 2))
|> Stream.map(&IO.inspect(&1))
Enum.to_list(stream)
1
2
2
4
3
6
#=> [2, 4, 6]
尽管最终结果相同,但元素打印的顺序发生了变化!使用流,我们打印第一个元素,然后打印它的两倍。在这个例子中,列表只被枚举了一次!
这就是我们之前说流是可组合的、延迟的枚举器时所指的。请注意,我们可以多次调用 Stream.map/2
,有效地组合流并保持它们的延迟性。只有当您从 Enum
模块调用函数时才会执行计算。
与 Enum
一样,此模块中的函数在线性时间内工作。这意味着执行操作所需的时间与列表长度的增长速度相同。这在 Stream.map/2
等操作中是预期的。毕竟,如果我们要遍历流上的每个元素,流越长,我们需要遍历的元素就越多,所需的时间就越长。
创建流
Elixir 的标准库中有很多函数返回流,一些例子是
IO.stream/2
- 逐行流式传输输入URI.query_decoder/1
- 解码查询字符串,成对
此模块还提供了许多用于创建流的便利函数,例如 Stream.cycle/1
、Stream.unfold/2
、Stream.resource/3
等等。
请注意,此模块中的函数保证返回枚举器。由于枚举器可以具有不同的形状(结构体、匿名函数等),此模块中的函数可能返回其中任何一种形状,并且这可能会随时更改。例如,今天返回匿名函数的函数在将来的版本中可能会返回一个结构体。
概要
函数
通过缓冲 fun
返回相同值的元素来对 enum
进行分块。
是 chunk_every(enum, count, count)
的快捷方式。
将枚举器流式传输为块,每个块包含 count
个元素,其中每个新块从枚举器的 step
个元素开始。
对 enum
进行分块,在每个块被发出时进行细粒度的控制。
创建一个流,该流枚举枚举器中的每个枚举器。
创建一个流,该流枚举第一个参数,然后是第二个参数。
创建一个流,该流无限循环遍历给定的枚举器。
创建一个流,该流只发出与上次发出的元素不同的元素。
创建一个流,该流只发出元素,如果对元素调用 fun
的结果不同于(存储的)对上次发出的元素调用 fun
的结果。
延迟地从枚举器中删除接下来的 n
个元素。
创建一个流,该流从枚举器中删除每个第 nth
个元素。
延迟地删除枚举器中的元素,直到给定函数返回真值。
在流中将给定元素重复 n
次。
对每个元素执行给定的函数。
创建一个流,该流根据枚举期间对给定函数的调用来过滤元素。
对 enumerable
上的给定 fun
进行映射,并扁平化结果。
延迟地在枚举的每个元素之间插入 intersperse_element
。
创建一个流,该流在给定的 n
毫秒时间段后发出一个值。
将流值作为副作用注入给定的可收集对象中。
发出一个值序列,从 start_value
开始。连续的值是通过对前一个值调用 next_fun
生成的。
创建一个流,该流将在枚举期间应用给定的函数。
创建一个流,该流将在枚举器中每隔 nth
个元素应用给定的函数。
创建一个流,该流将在枚举期间根据给定函数拒绝元素。
返回一个流,该流是通过重复调用 generator_fun
生成的。
为给定的资源发出一个值序列。
运行给定的流。
创建一个流,该流将给定函数应用于每个元素,发出结果,并将相同的结果用作下一次计算的累加器。使用枚举器中的第一个元素作为起始值。
创建一个流,该流将给定函数应用于每个元素,发出结果,并将相同的结果用作下一次计算的累加器。使用给定的 acc
作为起始值。
延迟地从枚举器中获取接下来的 count
个元素,并停止枚举。
创建一个流,该流从枚举器中获取每个第 nth
个元素。
延迟地获取枚举器中的元素,直到给定函数返回真值。
创建一个流,该流在 n
毫秒后发出一个值。
转换现有的流。
类似于 Stream.transform/5
,只是没有提供 last_fun
。
使用基于函数的启动、最后一个和之后回调来转换现有的流。
为给定的累加器发出一个值序列。
创建一个流,该流只发出唯一的元素。
创建一个流,该流只发出唯一的元素,通过删除函数 fun
返回重复元素的元素来实现。
创建一个流,其中枚举器中的每个元素都将与其索引一起封装在一个元组中。
将来自有限枚举器集合的对应元素压缩成一个元组流。
延迟地将两个枚举器压缩在一起。
延迟地将来自有限枚举器集合的对应元素压缩成一个新的枚举器,并在过程中使用 zip_fun
函数对其进行转换。
延迟地将两个枚举器中的对应元素压缩成一个新的枚举器,并在过程中使用 zip_fun
函数对其进行转换。
类型
函数
@spec chunk_by(Enumerable.t(), (element() -> any())) :: Enumerable.t()
通过缓冲 fun
返回相同值的元素来对 enum
进行分块。
只有当 fun
返回一个新值或 enum
结束时才会发出元素。
示例
iex> stream = Stream.chunk_by([1, 2, 2, 3, 4, 4, 6, 7, 7], &(rem(&1, 2) == 1))
iex> Enum.to_list(stream)
[[1], [2, 2], [3], [4, 4, 6], [7, 7]]
@spec chunk_every(Enumerable.t(), pos_integer()) :: Enumerable.t()
是 chunk_every(enum, count, count)
的快捷方式。
@spec chunk_every( Enumerable.t(), pos_integer(), pos_integer(), Enumerable.t() | :discard ) :: Enumerable.t()
将枚举器流式传输为块,每个块包含 count
个元素,其中每个新块从枚举器的 step
个元素开始。
step
是可选的,如果没有传递,则默认为 count
,即块不重叠。分块将在集合结束或我们发出不完整的块时停止。
如果最后一个块没有足够的元素来填充块,则从 leftover
中获取元素来填充块。如果 leftover
中没有足够的元素来填充块,则返回一个部分块,其中包含少于 count
个元素。
如果在 leftover
中给出 :discard
,则最后一个块将被丢弃,除非它恰好包含 count
个元素。
示例
iex> Stream.chunk_every([1, 2, 3, 4, 5, 6], 2) |> Enum.to_list()
[[1, 2], [3, 4], [5, 6]]
iex> Stream.chunk_every([1, 2, 3, 4, 5, 6], 3, 2, :discard) |> Enum.to_list()
[[1, 2, 3], [3, 4, 5]]
iex> Stream.chunk_every([1, 2, 3, 4, 5, 6], 3, 2, [7]) |> Enum.to_list()
[[1, 2, 3], [3, 4, 5], [5, 6, 7]]
iex> Stream.chunk_every([1, 2, 3, 4, 5, 6], 3, 3, []) |> Enum.to_list()
[[1, 2, 3], [4, 5, 6]]
iex> Stream.chunk_every([1, 2, 3, 4], 3, 3, Stream.cycle([0])) |> Enum.to_list()
[[1, 2, 3], [4, 0, 0]]
@spec chunk_while( Enumerable.t(), acc(), (element(), acc() -> {:cont, chunk, acc()} | {:cont, acc()} | {:halt, acc()}), (acc() -> {:cont, chunk, acc()} | {:cont, acc()}) ) :: Enumerable.t() when chunk: any()
对 enum
进行分块,在每个块被发出时进行细粒度的控制。
chunk_fun
接收当前元素和累加器,并必须返回 {:cont, element, acc}
以发出给定块并继续使用累加器,或者返回 {:cont, acc}
以不发出任何块并继续使用返回的累加器。
after_fun
在迭代完成后被调用,也必须返回 {:cont, element, acc}
或 {:cont, acc}
。
示例
iex> chunk_fun = fn element, acc ->
...> if rem(element, 2) == 0 do
...> {:cont, Enum.reverse([element | acc]), []}
...> else
...> {:cont, [element | acc]}
...> end
...> end
iex> after_fun = fn
...> [] -> {:cont, []}
...> acc -> {:cont, Enum.reverse(acc), []}
...> end
iex> stream = Stream.chunk_while(1..10, [], chunk_fun, after_fun)
iex> Enum.to_list(stream)
[[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]]
@spec concat(Enumerable.t()) :: Enumerable.t()
创建一个流,该流枚举枚举器中的每个枚举器。
示例
iex> stream = Stream.concat([1..3, 4..6, 7..9])
iex> Enum.to_list(stream)
[1, 2, 3, 4, 5, 6, 7, 8, 9]
@spec concat(Enumerable.t(), Enumerable.t()) :: Enumerable.t()
创建一个流,该流枚举第一个参数,然后是第二个参数。
示例
iex> stream = Stream.concat(1..3, 4..6)
iex> Enum.to_list(stream)
[1, 2, 3, 4, 5, 6]
iex> stream1 = Stream.cycle([1, 2, 3])
iex> stream2 = Stream.cycle([4, 5, 6])
iex> stream = Stream.concat(stream1, stream2)
iex> Enum.take(stream, 6)
[1, 2, 3, 1, 2, 3]
@spec cycle(Enumerable.t()) :: Enumerable.t()
创建一个流,该流无限循环遍历给定的枚举器。
示例
iex> stream = Stream.cycle([1, 2, 3])
iex> Enum.take(stream, 5)
[1, 2, 3, 1, 2]
@spec dedup(Enumerable.t()) :: Enumerable.t()
创建一个流,该流只发出与上次发出的元素不同的元素。
此函数只需要存储最后一个发出的元素。
元素使用 ===/2
进行比较。
示例
iex> Stream.dedup([1, 2, 3, 3, 2, 1]) |> Enum.to_list()
[1, 2, 3, 2, 1]
@spec dedup_by(Enumerable.t(), (element() -> term())) :: Enumerable.t()
创建一个流,该流只发出元素,如果对元素调用 fun
的结果不同于(存储的)对上次发出的元素调用 fun
的结果。
示例
iex> Stream.dedup_by([{1, :x}, {2, :y}, {2, :z}, {1, :x}], fn {x, _} -> x end) |> Enum.to_list()
[{1, :x}, {2, :y}, {1, :x}]
@spec drop(Enumerable.t(), integer()) :: Enumerable.t()
延迟地从枚举器中删除接下来的 n
个元素。
如果给出负数 n
,它将从集合中删除最后 n
个元素。请注意,此实现机制将延迟任何元素的发出,直到枚举器发出 n
个额外元素。
示例
iex> stream = Stream.drop(1..10, 5)
iex> Enum.to_list(stream)
[6, 7, 8, 9, 10]
iex> stream = Stream.drop(1..10, -5)
iex> Enum.to_list(stream)
[1, 2, 3, 4, 5]
@spec drop_every(Enumerable.t(), non_neg_integer()) :: Enumerable.t()
创建一个流,该流从枚举器中删除每个第 nth
个元素。
第一个元素总是被删除,除非 nth
为 0。
nth
必须是非负整数。
示例
iex> stream = Stream.drop_every(1..10, 2)
iex> Enum.to_list(stream)
[2, 4, 6, 8, 10]
iex> stream = Stream.drop_every(1..1000, 1)
iex> Enum.to_list(stream)
[]
iex> stream = Stream.drop_every([1, 2, 3, 4, 5], 0)
iex> Enum.to_list(stream)
[1, 2, 3, 4, 5]
@spec drop_while(Enumerable.t(), (element() -> as_boolean(term()))) :: Enumerable.t()
延迟地删除枚举器中的元素,直到给定函数返回真值。
示例
iex> stream = Stream.drop_while(1..10, &(&1 <= 5))
iex> Enum.to_list(stream)
[6, 7, 8, 9, 10]
@spec duplicate(any(), non_neg_integer()) :: Enumerable.t()
在流中将给定元素重复 n
次。
n
是一个大于或等于 0
的整数。
如果 n
为 0
,则返回一个空流。
示例
iex> stream = Stream.duplicate("hello", 0)
iex> Enum.to_list(stream)
[]
iex> stream = Stream.duplicate("hi", 1)
iex> Enum.to_list(stream)
["hi"]
iex> stream = Stream.duplicate("bye", 2)
iex> Enum.to_list(stream)
["bye", "bye"]
iex> stream = Stream.duplicate([1, 2], 3)
iex> Enum.to_list(stream)
[[1, 2], [1, 2], [1, 2]]
@spec each(Enumerable.t(), (element() -> term())) :: Enumerable.t()
对每个元素执行给定的函数。
流中的值不会改变,因此此函数对于向流添加副作用(如打印)很有用。如果需要生成不同的流,请参见 map/2
。
示例
iex> stream = Stream.each([1, 2, 3], fn x -> send(self(), x) end)
iex> Enum.to_list(stream)
iex> receive do: (x when is_integer(x) -> x)
1
iex> receive do: (x when is_integer(x) -> x)
2
iex> receive do: (x when is_integer(x) -> x)
3
@spec filter(Enumerable.t(), (element() -> as_boolean(term()))) :: Enumerable.t()
创建一个流,该流根据枚举期间对给定函数的调用来过滤元素。
示例
iex> stream = Stream.filter([1, 2, 3], fn x -> rem(x, 2) == 0 end)
iex> Enum.to_list(stream)
[2]
@spec flat_map(Enumerable.t(), (element() -> Enumerable.t())) :: Enumerable.t()
对 enumerable
上的给定 fun
进行映射,并扁平化结果。
此函数返回一个新的流,该流通过将对 enumerable
的每个元素调用 fun
的结果连接在一起构建。
示例
iex> stream = Stream.flat_map([1, 2, 3], fn x -> [x, x * 2] end)
iex> Enum.to_list(stream)
[1, 2, 2, 4, 3, 6]
iex> stream = Stream.flat_map([1, 2, 3], fn x -> [[x]] end)
iex> Enum.to_list(stream)
[[1], [2], [3]]
@spec intersperse(Enumerable.t(), any()) :: Enumerable.t()
延迟地在枚举的每个元素之间插入 intersperse_element
。
示例
iex> Stream.intersperse([1, 2, 3], 0) |> Enum.to_list()
[1, 0, 2, 0, 3]
iex> Stream.intersperse([1], 0) |> Enum.to_list()
[1]
iex> Stream.intersperse([], 0) |> Enum.to_list()
[]
@spec interval(timer()) :: Enumerable.t()
创建一个流,该流在给定的 n
毫秒时间段后发出一个值。
发出的值是从 0
开始的递增计数器。每次流式传输新元素时,此操作都会在给定时间间隔内阻塞调用方。
不要使用此函数生成数字序列。如果不需要阻塞调用方进程,请使用 Stream.iterate(0, & &1 + 1)
代替。
示例
iex> Stream.interval(10) |> Enum.take(10)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
@spec into(Enumerable.t(), Collectable.t(), (term() -> term())) :: Enumerable.t()
将流值作为副作用注入给定的可收集对象中。
@spec iterate(element(), (element() -> element())) :: Enumerable.t()
发出一个值序列,从 start_value
开始。连续的值是通过对前一个值调用 next_fun
生成的。
示例
iex> Stream.iterate(0, &(&1 + 1)) |> Enum.take(5)
[0, 1, 2, 3, 4]
@spec map(Enumerable.t(), (element() -> any())) :: Enumerable.t()
创建一个流,该流将在枚举期间应用给定的函数。
示例
iex> stream = Stream.map([1, 2, 3], fn x -> x * 2 end)
iex> Enum.to_list(stream)
[2, 4, 6]
@spec map_every(Enumerable.t(), non_neg_integer(), (element() -> any())) :: Enumerable.t()
创建一个流,该流将在枚举器中每隔 nth
个元素应用给定的函数。
第一个元素始终传递给给定函数。
nth
必须是非负整数。
示例
iex> stream = Stream.map_every(1..10, 2, fn x -> x * 2 end)
iex> Enum.to_list(stream)
[2, 2, 6, 4, 10, 6, 14, 8, 18, 10]
iex> stream = Stream.map_every([1, 2, 3, 4, 5], 1, fn x -> x * 2 end)
iex> Enum.to_list(stream)
[2, 4, 6, 8, 10]
iex> stream = Stream.map_every(1..5, 0, fn x -> x * 2 end)
iex> Enum.to_list(stream)
[1, 2, 3, 4, 5]
@spec reject(Enumerable.t(), (element() -> as_boolean(term()))) :: Enumerable.t()
创建一个流,该流将在枚举期间根据给定函数拒绝元素。
示例
iex> stream = Stream.reject([1, 2, 3], fn x -> rem(x, 2) == 0 end)
iex> Enum.to_list(stream)
[1, 3]
@spec repeatedly((-> element())) :: Enumerable.t()
返回一个流,该流是通过重复调用 generator_fun
生成的。
示例
# Although not necessary, let's seed the random algorithm
iex> :rand.seed(:exsss, {1, 2, 3})
iex> Stream.repeatedly(&:rand.uniform/0) |> Enum.take(3)
[0.5455598952593053, 0.6039309974353404, 0.6684893034823949]
@spec resource( (-> acc()), (acc() -> {[element()], acc()} | {:halt, acc()}), (acc() -> term()) ) :: Enumerable.t()
为给定的资源发出一个值序列。
类似于 transform/3
,但初始累加值是通过 start_fun
延迟计算的,并在枚举结束时执行一个 after_fun
(在成功和失败的情况下)。
通过使用前一个累加器(初始值为 start_fun
返回的结果)调用 next_fun
来生成连续的值,它必须返回一个包含要发出的元素列表和下一个累加器的元组。如果返回 {:halt, acc}
,则枚举结束。
顾名思义,此函数对于从资源中流式传输值很有用。
示例
Stream.resource(
fn -> File.open!("sample") end,
fn file ->
case IO.read(file, :line) do
data when is_binary(data) -> {[data], file}
_ -> {:halt, file}
end
end,
fn file -> File.close(file) end
)
iex> Stream.resource(
...> fn ->
...> {:ok, pid} = StringIO.open("string")
...> pid
...> end,
...> fn pid ->
...> case IO.getn(pid, "", 1) do
...> :eof -> {:halt, pid}
...> char -> {[char], pid}
...> end
...> end,
...> fn pid -> StringIO.close(pid) end
...> ) |> Enum.to_list()
["s", "t", "r", "i", "n", "g"]
@spec run(Enumerable.t()) :: :ok
运行给定的流。
当需要运行流以产生副作用,并且对它的返回结果没有兴趣时,这很有用。
示例
打开一个文件,将所有 #
替换为 %
,并将流式传输到另一个文件,而无需将整个文件加载到内存中
File.stream!("/path/to/file")
|> Stream.map(&String.replace(&1, "#", "%"))
|> Stream.into(File.stream!("/path/to/other/file"))
|> Stream.run()
@spec scan(Enumerable.t(), (element(), acc() -> any())) :: Enumerable.t()
创建一个流,该流将给定函数应用于每个元素,发出结果,并将相同的结果用作下一次计算的累加器。使用枚举器中的第一个元素作为起始值。
示例
iex> stream = Stream.scan(1..5, &(&1 + &2))
iex> Enum.to_list(stream)
[1, 3, 6, 10, 15]
@spec scan(Enumerable.t(), acc(), (element(), acc() -> any())) :: Enumerable.t()
创建一个流,该流将给定函数应用于每个元素,发出结果,并将相同的结果用作下一次计算的累加器。使用给定的 acc
作为起始值。
示例
iex> stream = Stream.scan(1..5, 0, &(&1 + &2))
iex> Enum.to_list(stream)
[1, 3, 6, 10, 15]
@spec take(Enumerable.t(), integer()) :: Enumerable.t()
延迟地从枚举器中获取接下来的 count
个元素,并停止枚举。
如果给出负数 count
,则将获取最后 count
个值。为此,将完全枚举集合,并在内存中保留最多 2 * count
个元素。当集合结束时,将执行最后 count
个元素。因此,在无限集合上使用负数 count
将永远不会返回。
示例
iex> stream = Stream.take(1..100, 5)
iex> Enum.to_list(stream)
[1, 2, 3, 4, 5]
iex> stream = Stream.take(1..100, -5)
iex> Enum.to_list(stream)
[96, 97, 98, 99, 100]
iex> stream = Stream.cycle([1, 2, 3]) |> Stream.take(5)
iex> Enum.to_list(stream)
[1, 2, 3, 1, 2]
@spec take_every(Enumerable.t(), non_neg_integer()) :: Enumerable.t()
创建一个流,该流从枚举器中获取每个第 nth
个元素。
第一个元素始终包含在内,除非 nth
为 0。
nth
必须是非负整数。
示例
iex> stream = Stream.take_every(1..10, 2)
iex> Enum.to_list(stream)
[1, 3, 5, 7, 9]
iex> stream = Stream.take_every([1, 2, 3, 4, 5], 1)
iex> Enum.to_list(stream)
[1, 2, 3, 4, 5]
iex> stream = Stream.take_every(1..1000, 0)
iex> Enum.to_list(stream)
[]
@spec take_while(Enumerable.t(), (element() -> as_boolean(term()))) :: Enumerable.t()
延迟地获取枚举器中的元素,直到给定函数返回真值。
示例
iex> stream = Stream.take_while(1..100, &(&1 <= 5))
iex> Enum.to_list(stream)
[1, 2, 3, 4, 5]
@spec timer(timer()) :: Enumerable.t()
创建一个流,该流在 n
毫秒后发出一个值。
发出的值为 0
。此操作将在给定的时间内阻塞调用方,直到元素被流式传输。
示例
iex> Stream.timer(10) |> Enum.to_list()
[0]
@spec transform(Enumerable.t(), acc, fun) :: Enumerable.t() when fun: (element(), acc -> {Enumerable.t(), acc} | {:halt, acc}), acc: any()
转换现有的流。
它期望一个累加器和一个函数,该函数接收两个参数:流元素和更新后的累加器。它必须返回一个元组,其中第一个元素是一个新流(通常是一个列表)或原子 :halt
,第二个元素是下一个元素要使用的累加器。
注意:此函数等效于 Enum.flat_map_reduce/3
,只是此函数在流处理完毕后不会返回累加器。
示例
Stream.transform/3
很有用,因为它可以用作实现此模块中定义的许多函数的基础。例如,我们可以实现 Stream.take(enum, n)
如下
iex> enum = 1001..9999
iex> n = 3
iex> stream = Stream.transform(enum, 0, fn i, acc ->
...> if acc < n, do: {[i], acc + 1}, else: {:halt, acc}
...> end)
iex> Enum.to_list(stream)
[1001, 1002, 1003]
Stream.transform/5
进一步概括了此函数,以允许包装资源。
@spec transform(Enumerable.t(), start_fun, reducer, after_fun) :: Enumerable.t() when start_fun: (-> acc), reducer: (element(), acc -> {Enumerable.t(), acc} | {:halt, acc}), after_fun: (acc -> term()), acc: any()
类似于 Stream.transform/5
,只是没有提供 last_fun
。
此函数可以看作是 Stream.resource/3
与 Stream.transform/3
的组合。
@spec transform(Enumerable.t(), start_fun, reducer, last_fun, after_fun) :: Enumerable.t() when start_fun: (-> acc), reducer: (element(), acc -> {Enumerable.t(), acc} | {:halt, acc}), last_fun: (acc -> {Enumerable.t(), acc} | {:halt, acc}), after_fun: (acc -> term()), acc: any()
使用基于函数的启动、最后一个和之后回调来转换现有的流。
一旦转换开始,就会调用 start_fun
来计算初始累加器。然后,对于枚举器中的每个元素,都会使用元素和累加器调用 reducer
函数,返回新的元素和新的累加器,就像在 transform/3
中一样。
一旦集合完成,就会使用累加器调用 last_fun
来发出任何剩余的项目。然后调用 after_fun
来关闭任何资源,但不发出任何新项目。只有当给定的枚举器成功终止(因为已完成或自行停止)时,才会调用 last_fun
。始终调用 after_fun
,因此 after_fun
必须是用于关闭资源的那个。
@spec unfold(acc(), (acc() -> {element(), acc()} | nil)) :: Enumerable.t()
为给定的累加器发出一个值序列。
通过使用前一个累加器调用 next_fun
来生成连续的值,它必须返回一个包含当前值和下一个累加器的元组。如果返回 nil
,则枚举结束。
示例
要创建一个向下计数并在零之前停止的流
iex> Stream.unfold(5, fn
...> 0 -> nil
...> n -> {n, n - 1}
...> end) |> Enum.to_list()
[5, 4, 3, 2, 1]
如果 next_fun
从不返回 nil
,则返回的流是*无限*的
iex> Stream.unfold(0, fn
...> n -> {n, n + 1}
...> end) |> Enum.take(10)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
iex> Stream.unfold(1, fn
...> n -> {n, n * 2}
...> end) |> Enum.take(10)
[1, 2, 4, 8, 16, 32, 64, 128, 256, 512]
@spec uniq(Enumerable.t()) :: Enumerable.t()
创建一个流,该流只发出唯一的元素。
请记住,为了知道一个元素是否唯一,此函数需要存储流发出的所有唯一值。因此,如果流是无限的,则存储的元素数量将无限增长,永远不会被垃圾回收。
示例
iex> Stream.uniq([1, 2, 3, 3, 2, 1]) |> Enum.to_list()
[1, 2, 3]
@spec uniq_by(Enumerable.t(), (element() -> term())) :: Enumerable.t()
创建一个流,该流只发出唯一的元素,通过删除函数 fun
返回重复元素的元素来实现。
函数 fun
将每个元素映射到一个项,该项用于确定两个元素是否重复。
请记住,为了知道一个元素是否唯一,此函数需要存储流发出的所有唯一值。因此,如果流是无限的,则存储的元素数量将无限增长,永远不会被垃圾回收。
示例
iex> Stream.uniq_by([{1, :x}, {2, :y}, {1, :z}], fn {x, _} -> x end) |> Enum.to_list()
[{1, :x}, {2, :y}]
iex> Stream.uniq_by([a: {:tea, 2}, b: {:tea, 2}, c: {:coffee, 1}], fn {_, y} -> y end) |> Enum.to_list()
[a: {:tea, 2}, c: {:coffee, 1}]
@spec with_index(Enumerable.t(), integer()) :: Enumerable.t()
创建一个流,其中枚举器中的每个元素都将与其索引一起封装在一个元组中。
如果给出 offset
,我们将从给定的偏移量而不是从零开始索引。
示例
iex> stream = Stream.with_index([1, 2, 3])
iex> Enum.to_list(stream)
[{1, 0}, {2, 1}, {3, 2}]
iex> stream = Stream.with_index([1, 2, 3], 3)
iex> Enum.to_list(stream)
[{1, 3}, {2, 4}, {3, 5}]
@spec zip(enumerables) :: Enumerable.t() when enumerables: [Enumerable.t()] | Enumerable.t()
将来自有限枚举器集合的对应元素压缩成一个元组流。
一旦给定集合中的任何枚举器完成,压缩就会结束。
示例
iex> concat = Stream.concat(1..3, 4..6)
iex> cycle = Stream.cycle(["foo", "bar", "baz"])
iex> Stream.zip([concat, [:a, :b, :c], cycle]) |> Enum.to_list()
[{1, :a, "foo"}, {2, :b, "bar"}, {3, :c, "baz"}]
@spec zip(Enumerable.t(), Enumerable.t()) :: Enumerable.t()
延迟地将两个枚举器压缩在一起。
一旦任何一个枚举器完成,压缩就会结束。
示例
iex> concat = Stream.concat(1..3, 4..6)
iex> cycle = Stream.cycle([:a, :b, :c])
iex> Stream.zip(concat, cycle) |> Enum.to_list()
[{1, :a}, {2, :b}, {3, :c}, {4, :a}, {5, :b}, {6, :c}]
@spec zip_with(enumerables, (Enumerable.t() -> term())) :: Enumerable.t() when enumerables: [Enumerable.t()] | Enumerable.t()
延迟地将来自有限枚举器集合的对应元素压缩成一个新的枚举器,并在过程中使用 zip_fun
函数对其进行转换。
来自 enumerables
中每个枚举的第一个元素将被放入一个列表中,然后传递给一元 zip_fun
函数。然后,来自每个枚举的第二个元素将被放入一个列表中并传递给 zip_fun
,以此类推,直到 enumerables
中的任何一个枚举完成。
返回一个新的枚举,其中包含调用 zip_fun
的结果。
示例
iex> concat = Stream.concat(1..3, 4..6)
iex> Stream.zip_with([concat, concat], fn [a, b] -> a + b end) |> Enum.to_list()
[2, 4, 6, 8, 10, 12]
iex> concat = Stream.concat(1..3, 4..6)
iex> Stream.zip_with([concat, concat, 1..3], fn [a, b, c] -> a + b + c end) |> Enum.to_list()
[3, 6, 9]
@spec zip_with(Enumerable.t(), Enumerable.t(), (term(), term() -> term())) :: Enumerable.t()
延迟地将两个枚举器中的对应元素压缩成一个新的枚举器,并在过程中使用 zip_fun
函数对其进行转换。
zip_fun
将使用 enumerable1
的第一个元素和 enumerable2
的第一个元素进行调用,然后使用每个元素的第二个元素进行调用,以此类推,直到任一枚举完成。
示例
iex> concat = Stream.concat(1..3, 4..6)
iex> Stream.zip_with(concat, concat, fn a, b -> a + b end) |> Enum.to_list()
[2, 4, 6, 8, 10, 12]