Task.async в Elixir Stream
Я хочу сделать параллельную карту над большим списком. Код выглядит примерно так:
big_list
|> Stream.map(&Task.async(Module, :do_something, [&1]))
|> Stream.map(&Task.await(&1))
|> Enum.filter filter_fun
Но я проверял реализацию Stream, и насколько я понимаю, Stream.map
объединяет функции и применяет комбинированную функцию к элементам в потоке, что означает, что последовательность выглядит так:
- Возьмите первый элемент
- Создать задачу async
- Подождите, пока он закончит.
- Возьмите второй элемент...
В этом случае он не делает это параллельно. Я прав, или я чего-то не хватает?
Если я прав, как насчет этого кода?
Stream.map Task.async ...
|> Enum.map Task.await ...
Это будет работать параллельно?
Ответы
Ответ 1
Второй также не делает то, что вы хотите. Вы можете ясно видеть это с помощью этого кода:
defmodule Test do
def test do
[1,2,3]
|> Stream.map(&Task.async(Test, :job, [&1]))
|> Enum.map(&Task.await(&1))
end
def job(number) do
:timer.sleep 1000
IO.inspect(number)
end
end
Test.test
Вы увидите число, затем 1 секунду ожидания, другое число и т.д. Ключевым моментом здесь является то, что вы хотите как можно скорее создать задачи, поэтому не следует использовать
ленивый Stream.map
вообще. Вместо этого используйте нетерпение Enum.map
в этой точке:
|> Enum.map(&Task.async(Test, :job, [&1]))
|> Enum.map(&Task.await(&1))
С другой стороны, вы можете использовать Stream.map
при ожидании, до тех пор, пока вы выполняете какую-либо активную операцию позже, например, filter
. Таким образом, ожидания будут перемешаны с любой обработкой, которую вы могли бы делать по результатам.
Ответ 2
Elixir 1.4 предоставляет новую функцию Task.async_stream/5, которая будет возвращать поток, который одновременно запускает заданную функцию по каждому элементу в перечислим.
Существуют также опции для указания максимального числа рабочих и таймаута с использованием параметров параметров :max_concurrency
и :timeout
.
Это приведет к тому, что ваш пример будет запущен одновременно:
big_list
|> Task.async_stream(Module, :do_something, [&1])
|> Enum.filter(filter_fun)
Ответ 3
Вы можете попробовать Parallel Stream.
stream = 1..10 |> ParallelStream.map(fn i -> i * 2 end)
stream |> Enum.into([])
[2,4,6,8,10,12,14,16,18,20]
UPD
Или лучше использовать Flow