Ответ 1
То, что вы описываете, является, конечно, приоритетной.
Rx - это все о потоках событий, а не о очередях. Конечно, очереди много используются в Rx - но они не являются концепцией первого класса, большей частью детали реализации концепций Rx.
Хорошим примером того, где нам нужны очереди, является работа с медленным наблюдателем. События отправляются последовательно в Rx, и если события прибывают быстрее, чем наблюдатель может справиться с ними, тогда они должны быть поставлены в очередь против этого наблюдателя. Если есть много наблюдателей, то необходимо поддерживать несколько логических очередей, поскольку наблюдатели могут прогрессировать в разных шагах - и Rx предпочитает не держать их в блокировке.
"Back-pressure" - это концепция наблюдателей, обеспечивающих обратную связь с наблюдаемыми, чтобы позволить механизмам выдерживать давление более быстрого наблюдения - например, слияние или дросселирование. Rx не имеет первоклассного способа введения обратного давления - единственное встроенное средство, которое наблюдаемое наблюдение наблюдателей происходит через синхронный характер OnNext
. Любой другой механизм должен быть вне диапазона. Ваш вопрос напрямую связан с противодавлением, поскольку он применим только в случае медленного наблюдателя.
Я упоминаю все это, чтобы предоставить доказательства моей претензии о том, что Rx не является отличным выбором для обеспечения такой приоритетной отправки, которую вы ищете - действительно, первоклассный механизм очередей кажется более подходящим.
Чтобы решить эту проблему, вам необходимо самостоятельно управлять очередью приоритетов в пользовательском операторе. Чтобы переформулировать проблему: вы говорите, что если события происходят во время обработки наблюдателем события OnNext
, так что существует рассылка событий для отправки, то вместо типичной очереди FIFO, которую использует Rx, вы хотите отправить на основе некоторого приоритета.
Следует отметить, что в духе того, как Rx не удерживает несколько наблюдателей в блокированном шаге, параллельные наблюдатели потенциально могут видеть события в другом порядке, что может быть или не быть проблемой для вас. Вы можете использовать такой механизм, как Publish
, чтобы получить согласованность заказов, но вы, вероятно, не хотите этого делать, поскольку время доставки события в этом сценарии будет весьма непредсказуемым и неэффективным.
Я уверен, что есть более эффективные способы сделать это, но вот один пример доставки на основе очереди с приоритетом - вы можете расширить его для работы с несколькими потоками и приоритетами (или даже с приоритетами для каждого события), используя лучший (например, очередь приоритетов на основе b-дерева), но я решил сохранить это достаточно просто. Даже тогда обратите внимание на значительное количество проблем, с которыми приходится сталкиваться коду: обработка ошибок, завершение и т.д. - и я сделал выбор в отношении того, когда им сообщается о том, что, безусловно, есть много других допустимых вариантов.
Все-в-одном, эта реализация, безусловно, избавляет меня от идеи использования Rx для этого. Это достаточно сложно, что, вероятно, здесь есть ошибки. Как я уже сказал, для этого может быть более аккуратный код (особенно учитывая минимальные усилия, которые я вложил в это!), Но концептуально мне неудобно идея независимо от реализации:
public static class ObservableExtensions
{
public static IObservable<TSource> MergeWithLowPriorityStream<TSource>(
this IObservable<TSource> source,
IObservable<TSource> lowPriority,
IScheduler scheduler = null)
{
scheduler = scheduler ?? Scheduler.Default;
return Observable.Create<TSource>(o => {
// BufferBlock from TPL dataflow is used as it is
// handily awaitable. package: Microsoft.Tpl.Dataflow
var loQueue = new BufferBlock<TSource>();
var hiQueue = new BufferBlock<TSource>();
var errorQueue = new BufferBlock<Exception>();
var done = new TaskCompletionSource<int>();
int doneCount = 0;
Action incDone = () => {
var dc = Interlocked.Increment(ref doneCount);
if(dc == 2)
done.SetResult(0);
};
source.Subscribe(
x => hiQueue.Post(x),
e => errorQueue.Post(e),
incDone);
lowPriority.Subscribe(
x => loQueue.Post(x),
e => errorQueue.Post(e),
incDone);
return scheduler.ScheduleAsync(async(ctrl, ct) => {
while(!ct.IsCancellationRequested)
{
TSource nextItem;
if(hiQueue.TryReceive(out nextItem)
|| loQueue.TryReceive(out nextItem))
o.OnNext(nextItem);
else if(done.Task.IsCompleted)
{
o.OnCompleted();
return;
}
Exception error;
if(errorQueue.TryReceive(out error))
{
o.OnError(error);
return;
}
var hiAvailableAsync = hiQueue.OutputAvailableAsync(ct);
var loAvailableAsync = loQueue.OutputAvailableAsync(ct);
var errAvailableAsync =
errorQueue.OutputAvailableAsync(ct);
await Task.WhenAny(
hiAvailableAsync,
loAvailableAsync,
errAvailableAsync,
done.Task);
}
});
});
}
}
И пример использования:
void static Main()
{
var xs = Observable.Range(0, 3);
var ys = Observable.Range(10, 3);
var source = ys.MergeWithLowPriorityStream(xs);
source.Subscribe(Console.WriteLine, () => Console.WriteLine("Done"));
}
Сначала будут отображаться элементы ys
, указывая их более высокий приоритет.