Динамический граф вычислений во время выполнения с использованием Elixir Genstage
Я хотел бы иметь возможность динамически изменять конвейер вычислений во время выполнения, но, похоже, GenStage требует, чтобы вычислитель вычислялся во время компиляции с помощью механизма subscribe_to: [...]
. Есть ли способ создания динамических вычислительных графов? Например, ниже я хотел бы во время выполнения переключаться между вершинами "вычитать 7" и "вычитать 4" в моем конвейерном графе.
Возможно ли это с помощью GenStage? Скорее всего, у меня будут очень сложные конвейеры, поэтому мне нужно решение, которое масштабируется для изменения графиков сложными способами, в отличие от специальных решений, таких как, например, параметризация целого числа для вычитания. Я хотел бы иметь возможность добавлять или удалять целые поддеревья, переключаться между поддеревьями и добавлять узлы в график, в том числе вставляя их в середину любого поддерева, включая главное дерево.
Пожалуйста, обратитесь к редактировать дальше вниз
Вот начальный производитель:
defmodule GenstageTest.Producer do
use GenStage
def start_link(initial \\ 1) do
GenStage.start_link(__MODULE__, initial, name: __MODULE__)
end
def init(counter), do: {:producer, counter}
def handle_demand(demand, state) do
events = Enum.to_list(state..(state + demand - 1))
{:noreply, events, state + demand}
end
end
Вот один из производителей:
defmodule GenstageTest.PcTimesFive do
use GenStage
def start_link do
GenStage.start_link(__MODULE__, :state_doesnt_matter, name: __MODULE__)
end
def init(state) do
{:producer_consumer, state, subscribe_to: [GenstageTest.PcAddOne]}
end
def handle_events(events, _from, state) do
numbers =
events
|> Enum.map(&(&1 * 5))
{:noreply, numbers, state}
end
end
и вот конечный потребитель:
defmodule GenstageTest.Consumer do
use GenStage
def start_link do
GenStage.start_link(__MODULE__, :state_doesnt_matter)
end
def init(state) do
{:consumer, state, subscribe_to: [GenstageTest.PcDivTwo]}
end
def handle_events(events, _from, state) do
for event <- events do
IO.inspect({self(), event, state})
end
# As a consumer we never emit events
{:noreply, [], state}
end
end
я Все это смоделировано на основе учебника Elixir School Genstage.
Все модули и mix.exs можно найти на github.
РЕДАКТИРОВАТЬ 3 дня спустя после частичного ответа от @AquarHEAD L.
Мне удалось заставить работать подписки во время выполнения. Вот некоторые измененные производители, provider_consumers и потребители соответственно:
Режиссер:
defmodule GenstageTest.Producer do
use GenStage
def start_link(initial \\ 1) do
GenStage.start_link(__MODULE__, initial, name: __MODULE__)
end
def init(counter), do: {:producer, counter}
def handle_demand(demand, state) do
events = Enum.to_list(state..(state + demand - 1))
{:noreply, events, state + demand}
end
def handle_info({:doprint}, state) do
IO.puts "yep"
{:noreply, [], state}
end
def handle_info({:cancel, sublink}, state) do
GenStage.cancel sublink, []
{:noreply, [], state}
end
end
Producter_consumer:
defmodule GenstageTest.PcAddOne do
use GenStage
def start_link do
GenStage.start_link(__MODULE__, :state_doesnt_matter, name: __MODULE__)
end
def init(state) do
{:producer_consumer, state}
end
def handle_events(events, _from, state) do
numbers =
events
|> Enum.map(&(&1 + 1))
{:noreply, numbers, state}
end
end
Потребитель:
defmodule GenstageTest.Consumer do
use GenStage
def start_link do
GenStage.start_link(__MODULE__, :state_doesnt_matter)
end
def init(state) do
{:consumer, state}
end
def handle_events(events, _from, state) do
for event <- events do
IO.inspect event
#File.write("/home/tbrowne/scratch/output.txt",
# Kernel.inspect(event) <> " ", [:append])
:timer.sleep(100)
end
# As a consumer we never emit events
{:noreply, [], state}
end
end
Теперь, когда все они доступны в каталоге lib (не забудьте добавить {:gen_stage, "~> 0.11"}
в вашу папку mix.exs) или скопировать и вставить в IEX, следующее будет отлично работать:
{:ok, p} = GenstageTest.Producer.start_link(0)
{:ok, a1} = GenstageTest.PcAddOne.start_link()
{:ok, c} = GenstageTest.Consumer.start_link()
{:ok, link1} = GenStage.sync_subscribe(a1, to: p, min_demand: 0, max_demand: 1, cancel: :transient)
{:ok, link2} = GenStage.sync_subscribe(c, to: a1, min_demand: 0, max_demand: 1, cancel: :transient)
Проблема сейчас в том, что я до сих пор не знаю, как отменить подписку. Существует функция отмены, а также функция остановки. GenStage.stop(c)
похоже, ничего не делает, в то время как мои различные попытки GenStage.cancel/3
только дают ошибки.
Напомним, что сейчас мне нужно уметь останавливать определенные этапы и заменять их другими. Каков синтаксис отмены подписки и откуда она вызывается? Это не очень хорошо объяснено в документах, так как нет конкретного примера.
Ответы
Ответ 1
Вы можете абсолютно изменить конвейер во время выполнения, ознакомьтесь с первым примером в документации GenStage, вы также можете использовать :manual
режим для точного управления запросами. Там также API для отмены подписки. Я думаю, что этого достаточно для динамического управления конвейерами GenStage.
Ответ 2
Почему бы не реализовать свой собственный GenStage.Dispatcher
? Вот поведение