Rx: как я могу ответить немедленно, и обрабатывать последующие запросы
Я хотел бы настроить Rx-подписку, которая может сразу ответить на событие, а затем игнорировать последующие события, которые происходят в течение указанного периода "кулдауна".
Входящие в комплект поставки методы Throttle/Buffer отвечают только после истечения времени ожидания, что не совсем то, что мне нужно.
Вот какой код, который устанавливает сценарий, и использует Throttle (который не является решением, которое я хочу):
class Program
{
static Stopwatch sw = new Stopwatch();
static void Main(string[] args)
{
var subject = new Subject<int>();
var timeout = TimeSpan.FromMilliseconds(500);
subject
.Throttle(timeout)
.Subscribe(DoStuff);
var factory = new TaskFactory();
sw.Start();
factory.StartNew(() =>
{
Console.WriteLine("Batch 1 (no delay)");
subject.OnNext(1);
});
factory.StartNewDelayed(1000, () =>
{
Console.WriteLine("Batch 2 (1s delay)");
subject.OnNext(2);
});
factory.StartNewDelayed(1300, () =>
{
Console.WriteLine("Batch 3 (1.3s delay)");
subject.OnNext(3);
});
factory.StartNewDelayed(1600, () =>
{
Console.WriteLine("Batch 4 (1.6s delay)");
subject.OnNext(4);
});
Console.ReadKey();
sw.Stop();
}
private static void DoStuff(int i)
{
Console.WriteLine("Handling {0} at {1}ms", i, sw.ElapsedMilliseconds);
}
}
Результат запуска этого прямо сейчас:
Пакет 1 (без задержки)
Обработка 1 при 508 мс
Пакет 2 (задержка 1 с)
Пакет 3 (задержка 1,3 с)
Пакет 4 (задержка 1,6 с)
Обработка 4 при 2114 мс
Обратите внимание, что пакет 2 не обрабатывается (это нормально!), потому что мы ожидаем, что 500 мс истечет между запросами из-за характера дросселя. Пакет 3 также не обрабатывается (что менее хорошо, потому что это произошло более 500 мс из партии 2) из-за его близости к партии 4.
То, что я ищу, больше похоже на это:
Пакет 1 (без задержки)
Обработка 1 при ~ 0мс
Пакет 2 (задержка 1 с)
Обработка 2 при ~ 1000 с
Пакет 3 (задержка 1,3 с)
Пакет 4 (задержка 1,6 с)
Обработка 4 на ~ 1600 с
Обратите внимание, что пакет 3 не будет обрабатываться в этом сценарии (это нормально!), поскольку он встречается в пределах 500 мс партии 2.
ИЗМЕНИТЬ
Вот реализация для метода расширения "StartNewDelayed", который я использую:
/// <summary>Creates a Task that will complete after the specified delay.</summary>
/// <param name="factory">The TaskFactory.</param>
/// <param name="millisecondsDelay">The delay after which the Task should transition to RanToCompletion.</param>
/// <returns>A Task that will be completed after the specified duration.</returns>
public static Task StartNewDelayed(
this TaskFactory factory, int millisecondsDelay)
{
return StartNewDelayed(factory, millisecondsDelay, CancellationToken.None);
}
/// <summary>Creates a Task that will complete after the specified delay.</summary>
/// <param name="factory">The TaskFactory.</param>
/// <param name="millisecondsDelay">The delay after which the Task should transition to RanToCompletion.</param>
/// <param name="cancellationToken">The cancellation token that can be used to cancel the timed task.</param>
/// <returns>A Task that will be completed after the specified duration and that cancelable with the specified token.</returns>
public static Task StartNewDelayed(this TaskFactory factory, int millisecondsDelay, CancellationToken cancellationToken)
{
// Validate arguments
if (factory == null) throw new ArgumentNullException("factory");
if (millisecondsDelay < 0) throw new ArgumentOutOfRangeException("millisecondsDelay");
// Create the timed task
var tcs = new TaskCompletionSource<object>(factory.CreationOptions);
var ctr = default(CancellationTokenRegistration);
// Create the timer but don't start it yet. If we start it now,
// it might fire before ctr has been set to the right registration.
var timer = new Timer(self =>
{
// Clean up both the cancellation token and the timer, and try to transition to completed
ctr.Dispose();
((Timer)self).Dispose();
tcs.TrySetResult(null);
});
// Register with the cancellation token.
if (cancellationToken.CanBeCanceled)
{
// When cancellation occurs, cancel the timer and try to transition to cancelled.
// There could be a race, but it benign.
ctr = cancellationToken.Register(() =>
{
timer.Dispose();
tcs.TrySetCanceled();
});
}
if (millisecondsDelay > 0)
{
// Start the timer and hand back the task...
timer.Change(millisecondsDelay, Timeout.Infinite);
}
else
{
// Just complete the task, and keep execution on the current thread.
ctr.Dispose();
tcs.TrySetResult(null);
timer.Dispose();
}
return tcs.Task;
}
Ответы
Ответ 1
Исходный ответ, который я опубликовал, имеет недостаток: метод Window
, когда используется с Observable.Interval
для обозначения конца окна, устанавливает бесконечную серию окон размером 500 мкс. Мне действительно нужно окно, которое начинается, когда первый результат закачивается в объект и заканчивается после 500 мс.
Мои выборочные данные замаскировали эту проблему, потому что данные были хорошо разбиты на окна, которые уже собирались создать. (то есть 0-500 мс, 501-1000 мс, 1001-1500 мс и т.д.).
Рассмотрим вместо этого время:
factory.StartNewDelayed(300,() =>
{
Console.WriteLine("Batch 1 (300ms delay)");
subject.OnNext(1);
});
factory.StartNewDelayed(700, () =>
{
Console.WriteLine("Batch 2 (700ms delay)");
subject.OnNext(2);
});
factory.StartNewDelayed(1300, () =>
{
Console.WriteLine("Batch 3 (1.3s delay)");
subject.OnNext(3);
});
factory.StartNewDelayed(1600, () =>
{
Console.WriteLine("Batch 4 (1.6s delay)");
subject.OnNext(4);
});
Я получаю:
Пакет 1 (задержка 300 мс)
Обработка 1 при 356 мс
Пакет 2 (задержка 700 мс)
Обработка 2 при 750 мс
Пакет 3 (задержка 1,3 с)
Обработка 3 при 1346 мс
Пакет 4 (задержка 1,6 с)
Обработка 4 при 1644 мс
Это происходит потому, что окна начинаются с 0 мс, 500 мс, 1000 мс и 1500 мс, и поэтому каждый Subject.OnNext
прекрасно вписывается в собственное окно.
Я хочу:
Пакет 1 (задержка 300 мс)
Обработка 1 при ~ 300 мс
Пакет 2 (задержка 700 мс)
Пакет 3 (задержка 1,3 с)
Обработка 3 при ~ 1300 мс
Пакет 4 (задержка 1,6 с)
После долгих битв и часа, стучащих на него вместе с коллегой, мы пришли к лучшему решению, используя чистую Rx и одну локальную переменную:
bool isCoolingDown = false;
subject
.Where(_ => !isCoolingDown)
.Subscribe(
i =>
{
DoStuff(i);
isCoolingDown = true;
Observable
.Interval(cooldownInterval)
.Take(1)
.Subscribe(_ => isCoolingDown = false);
});
Наше предположение заключается в том, что вызовы метода подписки синхронизируются. Если это не так, тогда можно ввести простой замок.
Ответ 2
Вот мой подход. Это похоже на то, что было раньше, но оно не страдает от чрезмерно усердной проблемы с производством окон.
Желаемая функция работает так же, как Observable.Throttle
, но испускает квалификационные события, как только они поступают, а не задерживается на время дросселя или периода выборки. В течение заданной продолжительности после квалификационного события последующие события подавляются.
Указано как проверяемый метод расширения:
public static class ObservableExtensions
{
public static IObservable<T> SampleFirst<T>(
this IObservable<T> source,
TimeSpan sampleDuration,
IScheduler scheduler = null)
{
scheduler = scheduler ?? Scheduler.Default;
return source.Publish(ps =>
ps.Window(() => ps.Delay(sampleDuration,scheduler))
.SelectMany(x => x.Take(1)));
}
}
Идея состоит в том, чтобы использовать перегрузку Window
, которая создает неперекрывающиеся окна с помощью windowClosingSelector
, которая использует исходное временное смещение назад sampleDuration
. Поэтому каждое окно будет: (a) закрыто первым элементом в нем и (b) оставаться открытым до тех пор, пока не будет разрешен новый элемент. Затем мы просто выбираем первый элемент из каждого окна.
Версия Rx 1.x
Используемый выше метод расширения Publish
недоступен в Rx 1.x. Вот альтернатива:
public static class ObservableExtensions
{
public static IObservable<T> SampleFirst<T>(
this IObservable<T> source,
TimeSpan sampleDuration,
IScheduler scheduler = null)
{
scheduler = scheduler ?? Scheduler.Default;
var sourcePub = source.Publish().RefCount();
return sourcePub.Window(() => sourcePub.Delay(sampleDuration,scheduler))
.SelectMany(x => x.Take(1));
}
}
Ответ 3
Решение, которое я нашел после большого количества проб и ошибок, заключалось в замене дроссельной подписки на следующее:
subject
.Window(() => { return Observable.Interval(timeout); })
.SelectMany(x => x.Take(1))
.Subscribe(i => DoStuff(i));
Отредактировано для включения очистки Paul.
Ответ 4
Удивительное решение Эндрю! Мы можем сделать это еще дальше и очистить внутреннюю подписку.
subject
.Window(() => { return Observable.Interval(timeout); })
.SelectMany(x => x.Take(1))
.Subscribe(DoStuff);
Ответ 5
Ну, самое очевидное - использовать здесь функцию Repeat(). Однако, насколько я знаю, Repeat() может создавать проблемы, так что уведомления исчезают между моментом прекращения потока и подписываются снова. На практике это никогда не было проблемой для меня.
subject
.Take(1)
.Concat(Observable.Empty<long>().Delay(TimeSpan.FromMilliseconds(500)))
.Repeat();
Не забудьте заменить фактический тип источника.
UPDATE:
Обновленный запрос для использования Concat вместо Merge
Ответ 6
У меня есть еще один для тебя. Это не использует Repeat() или Interval(), поэтому может быть и то, что вы после этого:
subject
.Window(() => Observable.Timer(TimeSpan.FromMilliseconds(500)))
.SelectMany(x => x.Take(1));
Ответ 7
Я наткнулся на этот вопрос, пытаясь повторно реализовать собственное решение той же или подобной проблемы, используя .Window
Взгляните, похоже, это то же самое, что и этот, и решил довольно элегантно:
fooobar.com/questions/207414/...
Ответ 8
Используйте .Scan()
!
Это то, что я использую для Throttling, когда мне нужен первый хит (через определенный период) сразу, но задержка (и group/ignore) любых последующих хитов.
В основном работает как Throttle, но срабатывает немедленно, если предыдущий onNext был >= interval
назад, иначе, заплатите его ровно interval
от предыдущего удара. И, конечно, если в период "охлаждения" появляются множественные удары, дополнительные игнорируются, как и Throttle.
Разница с вашим случаем использования заключается в том, что если вы получите событие в 0 мс и 100 мс, они оба будут обрабатываться (в 0 мс и 500 мс), что может быть тем, что вы действительно хотите (в противном случае, аккумулятор легко адаптируется для игнорирования ЛЮБОЙ удар ближе, чем interval
к предыдущему).
public static IObservable<T> QuickThrottle<T>(this IObservable<T> src, TimeSpan interval, IScheduler scheduler)
{
return src
.Scan(new ValueAndDueTime<T>(), (prev, id) => AccumulateForQuickThrottle(prev, id, interval, scheduler))
.Where(vd => !vd.Ignore)
.SelectMany(sc => Observable.Timer(sc.DueTime, scheduler).Select(_ => sc.Value));
}
private static ValueAndDueTime<T> AccumulateForQuickThrottle<T>(ValueAndDueTime<T> prev, T value, TimeSpan interval, IScheduler s)
{
var now = s.Now;
// Ignore this completely if there is already a future item scheduled
// but do keep the dueTime for accumulation!
if (prev.DueTime > now) return new ValueAndDueTime<T> { DueTime = prev.DueTime, Ignore = true };
// Schedule this item at at least interval from the previous
var min = prev.DueTime + interval;
var nextTime = (now < min) ? min : now;
return new ValueAndDueTime<T> { DueTime = nextTime, Value = value };
}
private class ValueAndDueTime<T>
{
public DateTimeOffset DueTime;
public T Value;
public bool Ignore;
}
Ответ 9
Это старый пост, но ни один ответ не мог полностью удовлетворить мои потребности, поэтому я даю свое собственное решение:
public static IObservable<T> ThrottleOrImmediate<T>(this IObservable<T> source, TimeSpan delay, IScheduler scheduler)
{
return Observable.Create<T>((obs, token) =>
{
// Next item cannot be send before that time
DateTime nextItemTime = default;
return Task.FromResult(source.Subscribe(async item =>
{
var currentTime = DateTime.Now;
// If we already reach the next item time
if (currentTime - nextItemTime >= TimeSpan.Zero)
{
// Following item will be send only after the set delay
nextItemTime = currentTime + delay;
// send current item with scheduler
scheduler.Schedule(() => obs.OnNext(item));
}
// There is still time before we can send an item
else
{
// we schedule the time for the following item
nextItemTime = currentTime + delay;
try
{
await Task.Delay(delay, token);
}
catch (TaskCanceledException)
{
return;
}
// If next item schedule was change by another item then we stop here
if (nextItemTime > currentTime + delay)
return;
else
{
// Set next possible time for an item and send item with scheduler
nextItemTime = currentTime + delay;
scheduler.Schedule(() => obs.OnNext(item));
}
}
}));
});
}
Первый элемент немедленно отправляется, затем следующие элементы удушаются. Затем, если следующий элемент будет отправлен позже установленного времени, он также будет немедленно отправлен.