Как изменить реализацию Rx Builder, чтобы исправить исключение?
Я пытаюсь создать Rx Builder для использования Reactive Extension в синтаксисе выражения F # Computation Expression. Как исправить это, чтобы он не взорвал стек? Как пример Seq ниже.
И есть ли какие-либо планы по обеспечению реализации RxBuilder как части Reactive Extensions или как часть будущих версий .NET Framework?
open System
open System.Linq
open System.Reactive.Linq
type rxBuilder() =
member this.Delay f = Observable.Defer f
member this.Combine (xs: IObservable<_>, ys : IObservable<_>) =
Observable.merge xs ys
member this.Yield x = Observable.Return x
member this.YieldFrom (xs:IObservable<_>) = xs
let rx = rxBuilder()
let rec f x = seq { yield x
yield! f (x + 1) }
let rec g x = rx { yield x
yield! g (x + 1) }
//do f 5 |> Seq.iter (printfn "%A")
do g 5 |> Observable.subscribe (printfn "%A") |> ignore
do System.Console.ReadLine() |> ignore
Ответы
Ответ 1
Короткий ответ заключается в том, что Rx Framework не поддерживает генерацию наблюдаемых, используя рекурсивный шаблон, подобный этому, поэтому его нелегко сделать. Операция Combine
, используемая для последовательностей F #, требует некоторой специальной обработки, которую наблюдаемые не предоставляют. Rx Framework, вероятно, ожидает, что вы будете генерировать наблюдаемые с помощью Observable.Generate
, а затем использовать LINQ query/F # computation builder для их обработки.
В любом случае, вот некоторые мысли -
Прежде всего, вам нужно заменить Observable.merge
на Observable.Concat
. Первый запускает оба наблюдаемых параллельно, а второй сначала дает все значения из первого наблюдаемого, а затем производит значения из второго наблюдаемого. После этого изменения фрагмент, по крайней мере, напечатает ~ 800 номеров до.
Причина заключается в том, что Concat
создает наблюдаемое, которое вызывает Concat
, чтобы создать другое наблюдаемое, которое вызывает Concat
и т.д. Один из способов решения этой проблемы - добавить некоторую синхронизацию. Если вы используете Windows Forms, вы можете изменить Delay
так, чтобы он планировал наблюдаемое в потоке GUI (который отбрасывает текущий стек). Вот эскиз:
type RxBuilder() =
member this.Delay f =
let sync = System.Threading.SynchronizationContext.Current
let res = Observable.Defer f
{ new IObservable<_> with
member x.Subscribe(a) =
sync.Post( (fun _ -> res.Subscribe(a) |> ignore), null)
// Note: This is wrong, but we cannot easily get the IDisposable here
null }
member this.Combine (xs, ys) = Observable.Concat(xs, ys)
member this.Yield x = Observable.Return x
member this.YieldFrom (xs:IObservable<_>) = xs
Чтобы реализовать это правильно, вам придется написать свой собственный метод Concat
, что довольно сложно. Идея заключалась бы в следующем:
- Concat возвращает некоторый специальный тип, например.
IConcatenatedObservable
- Когда метод вызывается рекурсивно, вы создадите цепочку
IConcatenatedObservable
, ссылающуюся друг на друга
- Метод
Concat
будет искать эту цепочку и когда есть, например, три объекта, он опустит среднюю (чтобы всегда поддерживать цепочку длины не более 2).
Это слишком сложно для ответа StackOverflow, но это может быть полезной обратной связью для команды Rx.
Ответ 2
Обратите внимание, что это исправлено в Rx v2.0 (как уже упоминалось здесь), в более общем плане для всех операторов секвенирования (Concat, Catch, OnErrorResumeNext), а также императивных операторов (If, While и т.д.),.
В принципе, вы можете думать об этом классе операторов как о том, чтобы подписаться на другую последовательность в сообщении о терминальном наблюдателе (например, Concat подписывается на следующую последовательность после получения текущего сообщения OnCompleted), в котором появляется аналогия хвостовой рекурсии в.
В Rx v2.0 все хвостовые рекурсивные подписки сглаживаются в структуру данных, подобную очереди, для обработки по одному, разговаривая с наблюдателем в нисходящем направлении. Это позволяет избежать неограниченного роста наблюдателей, разговаривающих друг с другом для последовательных подписок.
Ответ 3
Это исправлено в Rx 2.0 Beta. И здесь test.
Ответ 4
Как насчет этого?
type rxBuilder() =
member this.Delay (f : unit -> 'a IObservable) =
{ new IObservable<_> with
member this.Subscribe obv = (f()).Subscribe obv }
member this.Combine (xs:'a IObservable, ys: 'a IObservable) =
{ new IObservable<_> with
member this.Subscribe obv = xs.Subscribe obv ;
ys.Subscribe obv }
member this.Yield x = Observable.Return x
member this.YieldFrom xs = xs
let rx = rxBuilder()
let rec f x = rx { yield x
yield! f (x + 1) }
do f 5 |> Observable.subscribe (fun x -> Console.WriteLine x) |> ignore
do System.Console.ReadLine() |> ignore
http://rxbuilder.codeplex.com/ (созданный для экспериментов с RxBuilder)
Одноразовый xs не подключен. Как только я попытаюсь подключить одноразовое устройство, он возвращается, чтобы взорвать стек.
Ответ 5
Если мы удалим синтаксический сахар из этого выражения вычисления (aka Monad), мы получим:
let rec g x = Observable.Defer (fun () -> Observable.merge(Observable.Return x, g (x + 1) )
Или в С#:
public static IObservable<int> g(int x)
{
return Observable.Defer<int>(() =>
{
return Observable.Merge(Observable.Return(x), g(x + 1));
});
}
Это определенно не хвостовая рекурсивность. Я думаю, что если вы можете сделать его хвостом рекурсивным, то это, вероятно, решит вашу проблему.