Существует ли метод Rx для повторного воспроизведения предыдущего значения, когда никакие значения не поступают?
Случай использования, с которым я столкнулся, и я подозреваю, что не могу быть единственным, для метода вроде:
IObservable<T> Observable.RepeatLastValueDuringSilence(this IObservable<T> inner, TimeSpan maxQuietPeriod);
который вернет все будущие элементы из внутреннего наблюдаемого, но также, если внутренний наблюдаемый не вызывает OnNext в течение определенного периода времени (maxQuietPeriod), он просто повторяет последнее значение (пока, конечно, внутренние вызовы OnCompleted или OnError).
Было бы оправдано, что служба периодически выдавала периодическое обновление статуса. Например:
var myStatus = Observable.FromEvent(
h=>this.StatusUpdate+=h,
h=>this.StatusUpdate-=h);
var messageBusStatusPinger = myStatus
.RepeatLastValueDuringSilence(TimeSpan.FromSeconds(1))
.Subscribe(update => _messageBus.Send(update));
Есть ли что-то подобное? Или я переоцениваю его полезность?
Спасибо,
Alex
PS: Я прошу прощения за любую неправильную терминологию/синтаксис, так как я просто изучаю Rx впервые.
Ответы
Ответ 1
Аналогичное решение для Мэтью, но здесь таймер начинается после того, как каждый элемент получен в источнике, что я считаю более правильным (однако различия вряд ли имеют значение):
public static IObservable<T> RepeatLastValueDuringSilence<T>(this IObservable<T> inner, TimeSpan maxQuietPeriod)
{
return inner.Select(x =>
Observable.Interval(maxQuietPeriod)
.Select(_ => x)
.StartWith(x)
).Switch();
}
И тест:
var source = Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(5).Select(_ => "1")
.Concat(Observable.Interval(TimeSpan.FromSeconds(1)).Take(5).Select(_ => "2"))
.Concat(Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(5).Select(_ => "3"));
source.RepeatLastValueDuringSilence(TimeSpan.FromMilliseconds(200)).Subscribe(Console.WriteLine);
Вы должны увидеть 1
напечатано 10 раз (5 из источника, 5 повторяется во время тишины), затем много 2
по мере того, как вы получаете источник из источника и еще 4 от молчания между ними, за которым следует бесконечное 3
.
Ответ 2
Этот довольно простой запрос выполняет задание:
var query =
source
.Select(s =>
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.StartWith(s)
.Select(x => s))
.Switch();
Никогда не недооценивайте силу .Switch()
.
Ответ 3
Я думаю, что это делает то, что вы хотите, если ваш наблюдаемый не горячий, вам нужно Publish
и Refcount
его:
public static IObservable<T> RepeatLastValueDuringSilence<T>(this IObservable<T> inner, TimeSpan maxQuietPeriod)
{
var throttled = inner.Throttle(maxQuietPeriod);
var repeating = throttled.SelectMany(i =>
Observable
.Interval(maxQuietPeriod)
.Select(_ => i)
.TakeUntil(inner));
return Observable.Merge(inner, throttled, repeating);
}
Ответ 4
В библиотеках Rx нет метода, но мне также нужен такой метод. В моем случае использования мне нужно было выводить значения, даже если источник не выдает никаких значений. Если вы не хотите выставлять какие-либо значения до тех пор, пока не поступит первое значение источника, вы можете удалить параметр defaultValue
и вызов createTimer()
перед вызовом подписки.
Планировщик необходим для запуска таймера. Очевидной перегрузкой будет тот, который не принимает планировщик и выбирает планировщик по умолчанию (я использовал планировщик ThreadPool).
Imports System.Reactive
Imports System.Reactive.Concurrency
Imports System.Reactive.Disposables
Imports System.Reactive.Linq
<Extension()>
Public Function AtLeastEvery(Of T)(source As IObservable(Of T),
timeout As TimeSpan,
defaultValue As T,
scheduler As IScheduler
) As IObservable(Of T)
If source Is Nothing Then Throw New ArgumentNullException("source")
If scheduler Is Nothing Then Throw New ArgumentNullException("scheduler")
Return Observable.Create(
Function(observer As IObserver(Of T))
Dim id As ULong
Dim gate As New Object()
Dim timer As New SerialDisposable()
Dim lastValue As T = defaultValue
Dim createTimer As Action =
Sub()
Dim startId As ULong = id
timer.Disposable = scheduler.Schedule(timeout,
Sub(self As Action(Of TimeSpan))
Dim noChange As Boolean
SyncLock gate
noChange = (id = startId)
If noChange Then
observer.OnNext(lastValue)
End If
End SyncLock
'only restart if no change, otherwise
'the change restarted the timeout
If noChange Then self(timeout)
End Sub)
End Sub
'start the first timeout
createTimer()
'subscribe to the source observable
Dim subscription = source.Subscribe(
Sub(v)
SyncLock gate
id += 1UL
lastValue = v
End SyncLock
observer.OnNext(v)
createTimer() 'reset the timeout
End Sub,
Sub(ex)
SyncLock gate
id += 1UL
End SyncLock
observer.OnError(ex)
'do not reset the timeout, because the sequence has ended
End Sub,
Sub()
SyncLock gate
id += 1UL
End SyncLock
observer.OnCompleted()
'do not reset the timeout, because the sequence has ended
End Sub)
Return New CompositeDisposable(timer, subscription)
End Function)
End Function