Способ равномерного сдвига буферизованных событий
То, что я пытаюсь достичь, - это буферизовать входящие события из некоторого IObservable (они входят в пакеты) и выпускать их дальше, но один за другим, даже с интервалом.
Вот так:
-oo-ooo-oo------------------oooo-oo-o-------------->
-o--o--o--o--o--o--o--------o--o--o--o--o--o--o---->
Так как я довольно новичок в Rx, я не уверен, что уже есть объект или оператор, который делает именно это. Может быть, это может быть сделано композицией?
обновление:
Благодаря
Richard Szalay для указания оператора Drain, я нашел еще один пример Джеймсом Майлсом использования Дренажа. Вот как мне удалось заставить его работать в WPF-приложении:
.Drain(x => {
Process(x);
return Observable.Return(new Unit())
.Delay(TimeSpan.FromSeconds(1), Scheduler.Dispatcher );
}).Subscribe();
Мне было весело, потому что упущение параметра планировщика приводит к сбою приложения в режиме отладки без какого-либо исключения (мне нужно узнать, как справляться с исключениями в Rx).
Метод Process напрямую изменяет состояние пользовательского интерфейса, но, я думаю, довольно просто сделать из него IObservable (используя ISubject?).
обновление:
Тем временем я экспериментировал с ISubject, класс ниже делает то, что я хотел - он своевременно выпускает буферизованный Ts:
public class StepSubject<T> : ISubject<T>
{
IObserver<T> subscriber;
Queue<T> queue = new Queue<T>();
MutableDisposable cancel = new MutableDisposable();
TimeSpan interval;
IScheduler scheduler;
bool idle = true;
public StepSubject(TimeSpan interval, IScheduler scheduler)
{
this.interval = interval;
this.scheduler = scheduler;
}
void Step()
{
T next;
lock (queue)
{
idle = queue.Count == 0;
if (!idle)
next = queue.Dequeue();
}
if (!idle)
{
cancel.Disposable = scheduler.Schedule(Step, interval);
subscriber.OnNext(next);
}
}
public void OnNext(T value)
{
lock (queue)
queue.Enqueue(value);
if (idle)
cancel.Disposable = scheduler.Schedule(Step);
}
public IDisposable Subscribe(IObserver<T> observer)
{
subscriber = observer;
return cancel;
}
}
Эта наивная реализация удалена из OnCompleted и OnError для ясности, а также разрешена только одна подписка.
Ответы
Ответ 1
На самом деле это трюк, чем кажется.
Использование Delay
не работает, потому что значения все равно будут выполняться в массе, только слегка задерживаются.
Использование Interval
с помощью CombineLatest
или Zip
не работает, так как первое приведет к пропущению значений исходного значения, а последнее - к значениям интервала буферизации.
Я думаю, что новый оператор Drain
(добавлен в 1.0.2787.0), в сочетании с Delay
должен сделать трюк:
source.Drain(x => Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1)).StartWith(x));
Оператор Drain
работает как SelectMany
, но ждет, пока предыдущий вывод не завершится до вызова селектора со следующим значением. Это еще не совсем то, что вам нужно (первое значение в блоке также будет отложено), но оно закрывается: Теперь использование теперь соответствует вашей мраморной диаграмме.
Изменить: По-видимому, Drain
в фреймворке не работает как SelectMany
. Я попрошу совета на официальных форумах. Тем временем, здесь реализация Drain, которая делает то, что вам нужно:
Редактировать 09/11: Исправлены ошибки в реализации и обновленном использовании в соответствии с запрошенной диаграммой мрамора.
public static class ObservableDrainExtensions
{
public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source,
Func<TSource, IObservable<TOut>> selector)
{
return Observable.Defer(() =>
{
BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());
return source
.Zip(queue, (v, q) => v)
.SelectMany(v => selector(v)
.Do(_ => { }, () => queue.OnNext(new Unit()))
);
});
}
}
Ответ 2
Только для полноты здесь есть альтернативная (более компактная) версия метода Drain(), предложенная Ричардом:
public static IObservable<T2> SelectManySequential<T1, T2>(
this IObservable<T1> source,
Func<T1, IObservable<T2>> selector
)
{
return source
.Select(x => Observable.Defer<T2>(() => selector(x)))
.Concat();
}
См. поток Drain + SelectMany =? в форуме Rx.
Update:
Я понял, что перегрузка Concat(), которую я использовал, была одним из моих личных расширений Rx, которые (еще не) являются частью фреймворка. Я сожалею об этой ошибке. Конечно, это делает мое решение менее элегантным, чем я думал.
Тем не менее для полноты я размещаю здесь мой метод перегрузки расширения Conact():
public static IObservable<T> Concat<T>(this IObservable<IObservable<T>> source)
{
return Observable.CreateWithDisposable<T>(o =>
{
var lockCookie = new Object();
bool completed = false;
bool subscribed = false;
var waiting = new Queue<IObservable<T>>();
var pendingSubscription = new MutableDisposable();
Action<Exception> errorHandler = e =>
{
o.OnError(e);
pendingSubscription.Dispose();
};
Func<IObservable<T>, IDisposable> subscribe = null;
subscribe = (ob) =>
{
subscribed = true;
return ob.Subscribe(
o.OnNext,
errorHandler,
() =>
{
lock (lockCookie)
{
if (waiting.Count > 0)
pendingSubscription.Disposable = subscribe(waiting.Dequeue());
else if (completed)
o.OnCompleted();
else
subscribed = false;
}
}
);
};
return new CompositeDisposable(pendingSubscription,
source.Subscribe(
n =>
{
lock (lockCookie)
{
if (!subscribed)
pendingSubscription.Disposable = subscribe(n);
else
waiting.Enqueue(n);
}
},
errorHandler
, () =>
{
lock (lockCookie)
{
completed = true;
if (!subscribed)
o.OnCompleted();
}
}
)
);
});
}
И теперь избивая себя собственным оружием:
Тот же метод Concat() может быть написан гораздо более изящным в Ричарде Салае блестящим способом:
public static IObservable<T> Concat<T>(this IObservable<IObservable<T>> source)
{
return Observable.Defer(() =>
{
BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());
return source
.Zip(queue, (v, q) => v)
.SelectMany(v =>
v.Do(_ => { }, () => queue.OnNext(new Unit()))
);
});
}
Так что кредит принадлежит Ричарду.: -)
Ответ 3
Вот как я это сделал, просто используя явную очередь (ReactiveCollection - просто причудливая версия WPF ObservableCollection - ReactiveCollection.ItemsAdded OnNext для каждого добавленного элемента, как вы можете себе представить):
https://github.com/xpaulbettsx/ReactiveXaml/blob/master/ReactiveXaml/ReactiveCollection.cs#L309
public static ReactiveCollection<T> CreateCollection<T>(this IObservable<T> FromObservable, TimeSpan? WithDelay = null)
{
var ret = new ReactiveCollection<T>();
if (WithDelay == null) {
FromObservable.ObserveOn(RxApp.DeferredScheduler).Subscribe(ret.Add);
return ret;
}
// On a timer, dequeue items from queue if they are available
var queue = new Queue<T>();
var disconnect = Observable.Timer(WithDelay.Value, WithDelay.Value)
.ObserveOn(RxApp.DeferredScheduler).Subscribe(_ => {
if (queue.Count > 0) {
ret.Add(queue.Dequeue());
}
});
// When new items come in from the observable, stuff them in the queue.
// Using the DeferredScheduler guarantees we'll always access the queue
// from the same thread.
FromObservable.ObserveOn(RxApp.DeferredScheduler).Subscribe(queue.Enqueue);
// This is a bit clever - keep a running count of the items actually
// added and compare them to the final count of items provided by the
// Observable. Combine the two values, and when they're equal,
// disconnect the timer
ret.ItemsAdded.Scan0(0, ((acc, _) => acc+1)).Zip(FromObservable.Aggregate(0, (acc,_) => acc+1),
(l,r) => (l == r)).Where(x => x != false).Subscribe(_ => disconnect.Dispose());
return ret;
}