Идиоматические двунаправленные трубы с нисходящим состоянием без потерь

Скажем, у меня простая модель производителя/потребителя, где потребитель хочет передать некоторое состояние производителю. Например, пусть объекты, идущие вниз по потоку, являются объектами, которые мы хотим записать в файл, а восходящие объекты - это некоторый токен, представляющий, где объект был записан в файле (например, смещение).

Эти два процесса могут выглядеть примерно так (с pipes-4.0),

{-# LANGUAGE GeneralizedNewtypeDeriving #-}

import Pipes
import Pipes.Core
import Control.Monad.Trans.State       
import Control.Monad

newtype Object = Obj Int
               deriving (Show)

newtype ObjectId = ObjId Int
                 deriving (Show, Num)

writeObjects :: Proxy ObjectId Object () X IO r
writeObjects = evalStateT (forever go) (ObjId 0)
  where go = do i <- get
                obj <- lift $ request i
                lift $ lift $ putStrLn $ "Wrote "++show obj
                modify (+1)

produceObjects :: [Object] -> Proxy X () ObjectId Object IO ()
produceObjects = go
  where go [] = return ()
        go (obj:rest) = do
            lift $ putStrLn $ "Producing "++show obj
            objId <- respond obj
            lift $ putStrLn $ "Object "++show obj++" has ID "++show objId
            go rest

objects = [ Obj i | i <- [0..10] ]

Простым, как это могло бы быть, у меня была небольшая трудность в рассуждениях о том, как их составлять. В идеале нам нужен потоковый контроль управления, как показано ниже,

  • writeObjects начинается с блокировки на request, отправив исходный ObjId 0 вверх по течению.
  • produceObjects отправляет первый объект Obj 0, downstream
  • writeObjects записывает объект и увеличивает его состояние и ждет request, на этот раз отправив ObjId 1 upstream
  • respond в produceObjects возвращается с ObjId 0
  • produceObjects продолжается на этапе (2) со вторым объектом, Obj 1

Моя первоначальная попытка заключалась в том, что композиция на основе нажатия следующая,

main = void $ run $ produceObjects objects >>~ const writeObjects

Обратите внимание на использование const для работы с другими несовместимыми типами (это, вероятно, проблема). В этом случае, однако, мы обнаруживаем, что ObjId 0 съедается,

Producing Obj 0
Wrote Obj 0
Object Obj 0 has ID ObjId 1
Producing Obj 1
...

Потянутый подход,

main = void $ run $ const (produceObjects objects) +>> writeObjects

имеет аналогичную проблему, на этот раз снижая Obj 0.

Как можно сочинять эти фрагменты желаемым образом?

Ответы

Ответ 1

Выбор какой композиции для использования зависит от того, какой компонент должен инициировать весь процесс. Если вы хотите, чтобы нисходящий канал инициировал процесс, вы хотите использовать композицию на основе pull (т.е. (>+>)/(+>>)), но если вы хотите, чтобы восходящий канал инициировал процесс, вам следует использовать push-based состав (т.е. (>>~)/(>~>)). Ошибки типа, которые вы получили, на самом деле предупреждали вас о том, что в вашем коде есть логическая ошибка: вы четко не определили, какой компонент инициирует процесс в первую очередь.

Из вашего описания очевидно, что вы хотите, чтобы поток управления начинался с produceObjects, поэтому вы хотите использовать push-based композицию. Когда вы используете композицию на основе push, тип оператора композиции скажет вам все, что вам нужно знать о том, как исправить ваш код. Я возьму его тип и специализируюсь на вашей цепочке композиций:

-- Here I'm using the `Server` and `Client` type synonyms to simplify the types
(>>~) :: Server ObjectId Object IO ()
      -> (Object -> Client ObjectId Object IO ())
      -> Effect IO ()

Как вы уже заметили, ошибка типа, которую вы получили при попытке использовать (>>~), сообщила вам, что вам не хватает аргумента типа Object для вашей функции writeObjects. Это статически предусматривает, что вы не можете запускать какой-либо код в writeObjects до получения первого Object (через начальный аргумент).

Решение состоит в том, чтобы переписать вашу функцию writeObjects следующим образом:

writeObjects :: Object -> Proxy ObjectId Object () X IO r
writeObjects obj0 = evalStateT (go obj0) (ObjId 0)
  where go obj = do i <- get
                    lift $ lift $ putStrLn $ "Wrote "++ show obj
                    modify (+1)
                    obj' <- lift $ request i
                    go obj'

Это дает правильное поведение:

>>> run $ produceObjects objects >>~ writeObjects
Producing Obj 0
Wrote Obj 0
Object Obj 0 has ID ObjId 0
Producing Obj 1
Wrote Obj 1
Object Obj 1 has ID ObjId 1
Producing Obj 2
Wrote Obj 2
Object Obj 2 has ID ObjId 2
Producing Obj 3
Wrote Obj 3
Object Obj 3 has ID ObjId 3
Producing Obj 4
Wrote Obj 4
Object Obj 4 has ID ObjId 4
Producing Obj 5
Wrote Obj 5
Object Obj 5 has ID ObjId 5
Producing Obj 6
Wrote Obj 6
Object Obj 6 has ID ObjId 6
Producing Obj 7
Wrote Obj 7
Object Obj 7 has ID ObjId 7
Producing Obj 8
Wrote Obj 8
Object Obj 8 has ID ObjId 8
Producing Obj 9
Wrote Obj 9
Object Obj 9 has ID ObjId 9
Producing Obj 10
Wrote Obj 10
Object Obj 10 has ID ObjId 10

Вы можете задаться вопросом, почему это требование о том, что одна из двух труб принимает начальный аргумент, имеет смысл, кроме абстрактного обоснования того, что это требует законов категории. Простое объяснение английского заключается в том, что альтернативой является то, что вам понадобится буфер, первый переданный Object "между" двумя трубами до writeObjects достиг своего первого оператора request. Такой подход создает много проблемных ситуаций и ошибок в работе с ошибками, но, вероятно, самая значительная проблема заключается в том, что состав труб больше не будет ассоциативным, и порядок эффектов будет меняться в зависимости от порядка, в котором вы сочинили вещи.

Хорошая вещь о операторах составления двунаправленных труб заключается в том, что типы работают так, что вы всегда можете определить, является ли компонент "активным" (т.е. инициирует управление) или "пассивным" (т.е. ожидает ввода) чисто изучая тип. Если композиция говорит, что определенная труба (например, writeObjects) должна принимать аргумент, то она пассивна. Если он не принимает никаких аргументов (например, produceObjects), он активен и инициирует управление. Таким образом, состав заставляет вас иметь не более одной активной трубы внутри вашего конвейера (труба, которая не принимает начальный аргумент) и что труба, которая начинает управлять.

Ответ 2

"const" - это то, где вы удаляете данные. Чтобы получить все данные, вы, вероятно, захотите сделать рабочий процесс на основе push следующим образом:

writeObjects :: Object -> Proxy ObjectId Object () X IO r
writeObjects obj = go 0 obj
  where
    go objid obj = do
        lift $ putStrLn $ "Wrote "++show obj
        obj' <- request objid
        go (objid + 1) obj'

-- produceObjects as before

main = void $ run $ produceObjects objects >>~ writeObjects

Ответ 3

Мы обсуждали это в списке рассылки, но я подумал, что я брошу его сюда и для тех, кто заинтересован.

Ваша проблема в том, что у вас есть две сопрограммы, которые готовы выплевывать ценности друг на друга. Для получения значения ни один из них не требует ввода другого. Итак, кто первый? Ну, вы сами это сказали:

writeObjects начинается с блокировки по запросу, отправив исходный ObjId 0 upstream

Итак, это означает, что нам нужно задержать produceObjects так, чтобы он ожидал сигнал ObjId, прежде чем выплескивать соответствующий объект (хотя, по-видимому, он не нуждается в указанном ID).

Погружаясь в внутренние прокси-серверы, вот волшебное заклинание, которое я не буду подробно объяснять в это время. Основная идея состоит в том, чтобы просто вводить ввод до того, как он вам понадобится, а затем применить ввод, когда это необходимо, но затем притворяться, будто вам нужен новый вход (хотя вам это еще не нужно):

delayD :: (Monad m) => Proxy a' a b' b m r -> b' -> Proxy a' a b' b m r
delayD p0 b' = case p0 of
    Request a' f -> Request a' (go . f)
    Respond b  g -> Respond b  (delayD (g b'))
    M m          -> M (liftM go m)
    Pure r       -> Pure r
  where
    go p = delayD p b'

Теперь вы можете использовать это на produceObjects objects вместо const, и ваша вторая попытка работает по желанию:

delayD (produceObjects objects) +>> writeObjects

Мы обсуждаем delayD в списке рассылки, чтобы узнать, заслуживает ли это включения в стандартный репертуар Pipes.