Как изменить реализацию Rx Builder, чтобы исправить исключение?

Я пытаюсь создать Rx Builder для использования Reactive Extension в синтаксисе выражения F # Computation Expression. Как исправить это, чтобы он не взорвал стек? Как пример Seq ниже. И есть ли какие-либо планы по обеспечению реализации RxBuilder как части Reactive Extensions или как часть будущих версий .NET Framework?

open System
open System.Linq
open System.Reactive.Linq

type rxBuilder() =    
    member this.Delay f = Observable.Defer f
    member this.Combine (xs: IObservable<_>, ys : IObservable<_>) = 
        Observable.merge xs ys      
    member this.Yield x = Observable.Return x
    member this.YieldFrom (xs:IObservable<_>) = xs

let rx = rxBuilder()

let rec f x = seq { yield x 
                    yield! f (x + 1) }

let rec g x = rx { yield x 
                    yield! g (x + 1) }


//do f 5 |> Seq.iter (printfn "%A")

do g 5 |> Observable.subscribe (printfn "%A") |> ignore

do System.Console.ReadLine() |> ignore

Ответы

Ответ 1

Короткий ответ заключается в том, что Rx Framework не поддерживает генерацию наблюдаемых, используя рекурсивный шаблон, подобный этому, поэтому его нелегко сделать. Операция Combine, используемая для последовательностей F #, требует некоторой специальной обработки, которую наблюдаемые не предоставляют. Rx Framework, вероятно, ожидает, что вы будете генерировать наблюдаемые с помощью Observable.Generate, а затем использовать LINQ query/F # computation builder для их обработки.

В любом случае, вот некоторые мысли -

Прежде всего, вам нужно заменить Observable.merge на Observable.Concat. Первый запускает оба наблюдаемых параллельно, а второй сначала дает все значения из первого наблюдаемого, а затем производит значения из второго наблюдаемого. После этого изменения фрагмент, по крайней мере, напечатает ~ 800 номеров до.

Причина заключается в том, что Concat создает наблюдаемое, которое вызывает Concat, чтобы создать другое наблюдаемое, которое вызывает Concat и т.д. Один из способов решения этой проблемы - добавить некоторую синхронизацию. Если вы используете Windows Forms, вы можете изменить Delay так, чтобы он планировал наблюдаемое в потоке GUI (который отбрасывает текущий стек). Вот эскиз:

type RxBuilder() =   
  member this.Delay f = 
      let sync = System.Threading.SynchronizationContext.Current 
      let res = Observable.Defer f
      { new IObservable<_> with
          member x.Subscribe(a) = 
            sync.Post( (fun _ -> res.Subscribe(a) |> ignore), null)
            // Note: This is wrong, but we cannot easily get the IDisposable here
            null }
  member this.Combine (xs, ys) = Observable.Concat(xs, ys)
  member this.Yield x = Observable.Return x
  member this.YieldFrom (xs:IObservable<_>) = xs

Чтобы реализовать это правильно, вам придется написать свой собственный метод Concat, что довольно сложно. Идея заключалась бы в следующем:

  • Concat возвращает некоторый специальный тип, например. IConcatenatedObservable
  • Когда метод вызывается рекурсивно, вы создадите цепочку IConcatenatedObservable, ссылающуюся друг на друга
  • Метод Concat будет искать эту цепочку и когда есть, например, три объекта, он опустит среднюю (чтобы всегда поддерживать цепочку длины не более 2).

Это слишком сложно для ответа StackOverflow, но это может быть полезной обратной связью для команды Rx.

Ответ 2

Обратите внимание, что это исправлено в Rx v2.0 (как уже упоминалось здесь), в более общем плане для всех операторов секвенирования (Concat, Catch, OnErrorResumeNext), а также императивных операторов (If, While и т.д.),.

В принципе, вы можете думать об этом классе операторов как о том, чтобы подписаться на другую последовательность в сообщении о терминальном наблюдателе (например, Concat подписывается на следующую последовательность после получения текущего сообщения OnCompleted), в котором появляется аналогия хвостовой рекурсии в.

В Rx v2.0 все хвостовые рекурсивные подписки сглаживаются в структуру данных, подобную очереди, для обработки по одному, разговаривая с наблюдателем в нисходящем направлении. Это позволяет избежать неограниченного роста наблюдателей, разговаривающих друг с другом для последовательных подписок.

Ответ 3

Это исправлено в Rx 2.0 Beta. И здесь test.

Ответ 4

Как насчет этого?

type rxBuilder() =    
   member this.Delay (f : unit -> 'a IObservable) = 
               { new IObservable<_> with
                    member this.Subscribe obv = (f()).Subscribe obv }
   member this.Combine (xs:'a IObservable, ys: 'a IObservable) =
               { new IObservable<_> with
                    member this.Subscribe obv = xs.Subscribe obv ; 
                                                ys.Subscribe obv }
   member this.Yield x = Observable.Return x
   member this.YieldFrom xs = xs

let rx = rxBuilder()

let rec f x = rx { yield x 
                   yield! f (x + 1) }

do f 5 |> Observable.subscribe (fun x -> Console.WriteLine x) |> ignore

do System.Console.ReadLine() |> ignore

http://rxbuilder.codeplex.com/ (созданный для экспериментов с RxBuilder)

Одноразовый xs не подключен. Как только я попытаюсь подключить одноразовое устройство, он возвращается, чтобы взорвать стек.

Ответ 5

Если мы удалим синтаксический сахар из этого выражения вычисления (aka Monad), мы получим:

let rec g x = Observable.Defer (fun () -> Observable.merge(Observable.Return x, g (x + 1) )

Или в С#:

public static IObservable<int> g(int x)
{
    return Observable.Defer<int>(() =>
    {
      return Observable.Merge(Observable.Return(x), g(x + 1));                    
    });
}

Это определенно не хвостовая рекурсивность. Я думаю, что если вы можете сделать его хвостом рекурсивным, то это, вероятно, решит вашу проблему.