Идиоматические двунаправленные трубы с нисходящим состоянием без потерь
Скажем, у меня простая модель производителя/потребителя, где потребитель хочет передать некоторое состояние производителю. Например, пусть объекты, идущие вниз по потоку, являются объектами, которые мы хотим записать в файл, а восходящие объекты - это некоторый токен, представляющий, где объект был записан в файле (например, смещение).
Эти два процесса могут выглядеть примерно так (с pipes-4.0
),
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
import Pipes
import Pipes.Core
import Control.Monad.Trans.State
import Control.Monad
newtype Object = Obj Int
deriving (Show)
newtype ObjectId = ObjId Int
deriving (Show, Num)
writeObjects :: Proxy ObjectId Object () X IO r
writeObjects = evalStateT (forever go) (ObjId 0)
where go = do i <- get
obj <- lift $ request i
lift $ lift $ putStrLn $ "Wrote "++show obj
modify (+1)
produceObjects :: [Object] -> Proxy X () ObjectId Object IO ()
produceObjects = go
where go [] = return ()
go (obj:rest) = do
lift $ putStrLn $ "Producing "++show obj
objId <- respond obj
lift $ putStrLn $ "Object "++show obj++" has ID "++show objId
go rest
objects = [ Obj i | i <- [0..10] ]
Простым, как это могло бы быть, у меня была небольшая трудность в рассуждениях о том, как их составлять. В идеале нам нужен потоковый контроль управления, как показано ниже,
-
writeObjects
начинается с блокировки на request
, отправив исходный ObjId 0
вверх по течению.
-
produceObjects
отправляет первый объект Obj 0
, downstream
-
writeObjects
записывает объект и увеличивает его состояние и ждет request
, на этот раз отправив ObjId 1
upstream
-
respond
в produceObjects
возвращается с ObjId 0
-
produceObjects
продолжается на этапе (2) со вторым объектом, Obj 1
Моя первоначальная попытка заключалась в том, что композиция на основе нажатия следующая,
main = void $ run $ produceObjects objects >>~ const writeObjects
Обратите внимание на использование const
для работы с другими несовместимыми типами (это, вероятно, проблема). В этом случае, однако, мы обнаруживаем, что ObjId 0
съедается,
Producing Obj 0
Wrote Obj 0
Object Obj 0 has ID ObjId 1
Producing Obj 1
...
Потянутый подход,
main = void $ run $ const (produceObjects objects) +>> writeObjects
имеет аналогичную проблему, на этот раз снижая Obj 0
.
Как можно сочинять эти фрагменты желаемым образом?
Ответы
Ответ 1
Выбор какой композиции для использования зависит от того, какой компонент должен инициировать весь процесс. Если вы хотите, чтобы нисходящий канал инициировал процесс, вы хотите использовать композицию на основе pull (т.е. (>+>)
/(+>>)
), но если вы хотите, чтобы восходящий канал инициировал процесс, вам следует использовать push-based состав (т.е. (>>~)
/(>~>)
). Ошибки типа, которые вы получили, на самом деле предупреждали вас о том, что в вашем коде есть логическая ошибка: вы четко не определили, какой компонент инициирует процесс в первую очередь.
Из вашего описания очевидно, что вы хотите, чтобы поток управления начинался с produceObjects
, поэтому вы хотите использовать push-based композицию. Когда вы используете композицию на основе push, тип оператора композиции скажет вам все, что вам нужно знать о том, как исправить ваш код. Я возьму его тип и специализируюсь на вашей цепочке композиций:
-- Here I'm using the `Server` and `Client` type synonyms to simplify the types
(>>~) :: Server ObjectId Object IO ()
-> (Object -> Client ObjectId Object IO ())
-> Effect IO ()
Как вы уже заметили, ошибка типа, которую вы получили при попытке использовать (>>~)
, сообщила вам, что вам не хватает аргумента типа Object
для вашей функции writeObjects
. Это статически предусматривает, что вы не можете запускать какой-либо код в writeObjects
до получения первого Object
(через начальный аргумент).
Решение состоит в том, чтобы переписать вашу функцию writeObjects
следующим образом:
writeObjects :: Object -> Proxy ObjectId Object () X IO r
writeObjects obj0 = evalStateT (go obj0) (ObjId 0)
where go obj = do i <- get
lift $ lift $ putStrLn $ "Wrote "++ show obj
modify (+1)
obj' <- lift $ request i
go obj'
Это дает правильное поведение:
>>> run $ produceObjects objects >>~ writeObjects
Producing Obj 0
Wrote Obj 0
Object Obj 0 has ID ObjId 0
Producing Obj 1
Wrote Obj 1
Object Obj 1 has ID ObjId 1
Producing Obj 2
Wrote Obj 2
Object Obj 2 has ID ObjId 2
Producing Obj 3
Wrote Obj 3
Object Obj 3 has ID ObjId 3
Producing Obj 4
Wrote Obj 4
Object Obj 4 has ID ObjId 4
Producing Obj 5
Wrote Obj 5
Object Obj 5 has ID ObjId 5
Producing Obj 6
Wrote Obj 6
Object Obj 6 has ID ObjId 6
Producing Obj 7
Wrote Obj 7
Object Obj 7 has ID ObjId 7
Producing Obj 8
Wrote Obj 8
Object Obj 8 has ID ObjId 8
Producing Obj 9
Wrote Obj 9
Object Obj 9 has ID ObjId 9
Producing Obj 10
Wrote Obj 10
Object Obj 10 has ID ObjId 10
Вы можете задаться вопросом, почему это требование о том, что одна из двух труб принимает начальный аргумент, имеет смысл, кроме абстрактного обоснования того, что это требует законов категории. Простое объяснение английского заключается в том, что альтернативой является то, что вам понадобится буфер, первый переданный Object
"между" двумя трубами до writeObjects
достиг своего первого оператора request
. Такой подход создает много проблемных ситуаций и ошибок в работе с ошибками, но, вероятно, самая значительная проблема заключается в том, что состав труб больше не будет ассоциативным, и порядок эффектов будет меняться в зависимости от порядка, в котором вы сочинили вещи.
Хорошая вещь о операторах составления двунаправленных труб заключается в том, что типы работают так, что вы всегда можете определить, является ли компонент "активным" (т.е. инициирует управление) или "пассивным" (т.е. ожидает ввода) чисто изучая тип. Если композиция говорит, что определенная труба (например, writeObjects
) должна принимать аргумент, то она пассивна. Если он не принимает никаких аргументов (например, produceObjects
), он активен и инициирует управление. Таким образом, состав заставляет вас иметь не более одной активной трубы внутри вашего конвейера (труба, которая не принимает начальный аргумент) и что труба, которая начинает управлять.
Ответ 2
"const" - это то, где вы удаляете данные. Чтобы получить все данные, вы, вероятно, захотите сделать рабочий процесс на основе push следующим образом:
writeObjects :: Object -> Proxy ObjectId Object () X IO r
writeObjects obj = go 0 obj
where
go objid obj = do
lift $ putStrLn $ "Wrote "++show obj
obj' <- request objid
go (objid + 1) obj'
-- produceObjects as before
main = void $ run $ produceObjects objects >>~ writeObjects
Ответ 3
Мы обсуждали это в списке рассылки, но я подумал, что я брошу его сюда и для тех, кто заинтересован.
Ваша проблема в том, что у вас есть две сопрограммы, которые готовы выплевывать ценности друг на друга. Для получения значения ни один из них не требует ввода другого. Итак, кто первый? Ну, вы сами это сказали:
writeObjects
начинается с блокировки по запросу, отправив исходный ObjId 0
upstream
Итак, это означает, что нам нужно задержать produceObjects
так, чтобы он ожидал сигнал ObjId
, прежде чем выплескивать соответствующий объект (хотя, по-видимому, он не нуждается в указанном ID).
Погружаясь в внутренние прокси-серверы, вот волшебное заклинание, которое я не буду подробно объяснять в это время. Основная идея состоит в том, чтобы просто вводить ввод до того, как он вам понадобится, а затем применить ввод, когда это необходимо, но затем притворяться, будто вам нужен новый вход (хотя вам это еще не нужно):
delayD :: (Monad m) => Proxy a' a b' b m r -> b' -> Proxy a' a b' b m r
delayD p0 b' = case p0 of
Request a' f -> Request a' (go . f)
Respond b g -> Respond b (delayD (g b'))
M m -> M (liftM go m)
Pure r -> Pure r
where
go p = delayD p b'
Теперь вы можете использовать это на produceObjects objects
вместо const
, и ваша вторая попытка работает по желанию:
delayD (produceObjects objects) +>> writeObjects
Мы обсуждаем delayD
в списке рассылки, чтобы узнать, заслуживает ли это включения в стандартный репертуар Pipes.