Ответ 1
Основными правилами Supply
являются:
- Без введения параллелизма без явного запроса
- Противодавление через модель отправителя-платит
- Сообщение обрабатывается полностью до следующего (так что
.map({...something with state... })
можно доверять, чтобы не вызвать конфликты над состоянием)
Правило 3 на самом деле не применяется к share
поскольку после этого происходит отдельная цепочка операций после операции, но правила 1 и 2 действительно выполняются. Целью share
является публикация/подписка, а также предоставление повторного использования части обработки несколькими обработчиками сообщений ниже по потоку. Внедрение параллельной обработки сообщений - это отдельная проблема.
Существуют различные варианты. Один из них - иметь сообщения для параллельной обработки, вставленные в Channel
. Это явно вводит место для буферизации сообщений (ну, пока у вас не закончится память... именно поэтому Supply
поставляется с моделью обратного давления с отправителем). Принуждение Channel
обратно в Supply
получает значения, вытащенные из Channel
и испускаемые этим источником Supply
в потоке бассейна. Это выглядит так:
my $supply = Supply.interval(0.2).share;
my $tap = $supply.Channel.Supply.tap: { say "1. $^a"; sleep 5 };
my $tap2 = $supply.tap: { say "2. $^a"; };
sleep 5;
Обратите внимание, что поскольку whenever
автоматически применяется то, на что он просил реагировать на Supply
, тогда это будет выглядеть whenever $supply.Channel { }
, что делает его довольно кратким решением, - но в то же время красиво явным в том whenever $supply.Channel { }
, что оно указывает как работает механизм обратного давления. Другим свойством этого решения является то, что он сохраняет порядок сообщений и по-прежнему дает обработку в один момент времени после Channel
.
Альтернативой является реагировать на каждое сообщение, вместо этого запуская какую-то асинхронную часть работы для ее обработки. start
операция в Supply
рассылает блок, который передается для запуска в пуле потоков для каждого принятого сообщения, что не блокирует приход следующего сообщения. Результатом является Supply
Supply
. Это заставляет нажимать на каждый внутренний Supply
чтобы на самом деле что-то произошло, что вначале кажется слегка противоречащим интуиции, но на самом деле это полезно для программиста: он дает понять, что есть дополнительный бит асинхронной работы для отслеживания. Я настоятельно рекомендую использовать это в сочетании с синтаксисом " react
/whenever
, который автоматически выполняет управление подпиской и распространение ошибок. Наиболее прямое преобразование кода в вопрос:
my $supply = Supply.interval(0.2).share;
my $tap = supply { whenever $supply.start({ say "1. $^a"; sleep 5 }) { whenever $_ {} } }.tap;
my $tap2 = $supply.tap: { say "2. $^a"; };
sleep 5;
Хотя также можно написать вместо этого:
my $supply = Supply.interval(0.2).share;
my $tap = supply { whenever $supply -> $a { whenever start { say "1. $a"; sleep 5 } {} } }.tap;
my $tap2 = $supply.tap: { say "2. $^a"; };
sleep 5;
Что указывает на возможность написания parallelize
Supply
:
my $supply = Supply.interval(0.2).share;
my $tap = parallelize($supply, { say "1. $^a"; sleep 5 }).tap;
my $tap2 = $supply.tap: { say "2. $^a"; };
sleep 5;
sub parallelize(Supply $messages, &operation) {
supply {
whenever $messages -> $value {
whenever start operation($value) {
emit $_;
}
}
}
}
Выход этого подхода сильно отличается от первого Channel
, поскольку все операции запускаются, как только приходит сообщение. Также он не сохраняет порядок сообщений. Там все еще есть неявная очередь (в отличие от явной с подходом " Channel
), это просто то, что теперь очередь работы с планировщиком пула потоков и планировщик ОС, который должен отслеживать незавершенную работу. И опять же, нет противодавления, но обратите внимание, что было бы вполне возможно реализовать это, отслеживая выдающиеся Promises
и блокируя дальнейшие входящие сообщения с await Promise.anyof(@outstanding)
.
Наконец, я хотел бы отметить, что race whenever
построении race whenever
либо рассматривался вопрос о hyper whenever
чтобы обеспечить некоторый механизм языкового уровня для параллельной обработки сообщений о Supply
. Однако семантика таких, и как они играют в supply
-block цели проектирования и свойство безопасности, представляют собой значительные проблемы проектирования.