Reactive Framework как очередь сообщений с использованием BlockingCollection
В последнее время я занимаюсь некоторой работой с Reactive Framework и до сих пор очень люблю его. Я смотрю на замену традиционной очереди сообщений опроса некоторыми фильтрами IObservables для очистки моих операций с сервером. По-старому я имел дело с сообщениями, поступающими на сервер следующим образом:
// Start spinning the process message loop
Task.Factory.StartNew(() =>
{
while (true)
{
Command command = m_CommandQueue.Take();
ProcessMessage(command);
}
}, TaskCreationOptions.LongRunning);
Это приводит к потоку непрерывного опроса, который делегирует команды от клиентов методу ProcessMessage, где у меня есть ряд операторов if/else-if, которые определяют тип команды и работу делегата в зависимости от ее типа
Я заменяю это системой, управляемой событиями, с помощью Reactive, для которой я написал следующий код:
private BlockingCollection<BesiegedMessage> m_MessageQueue = new BlockingCollection<BesiegedMessage>();
private IObservable<BesiegedMessage> m_MessagePublisher;
m_MessagePublisher = m_MessageQueue
.GetConsumingEnumerable()
.ToObservable(TaskPoolScheduler.Default);
// All generic Server messages (containing no properties) will be processed here
IDisposable genericServerMessageSubscriber = m_MessagePublisher
.Where(message => message is GenericServerMessage)
.Subscribe(message =>
{
// do something with the generic server message here
}
Мой вопрос в том, что, хотя это работает, хорошо ли использовать блокирующую коллекцию в качестве поддержки для IObservable? Я не вижу, когда Take() когда-либо называется таким образом, что заставляет меня думать, что сообщения будут сбиваться в очереди, не удаляясь после их обработки?
Было бы более эффективным рассматривать объекты как коллекцию резервных копий для управления фильтром IObservables, который будет собирать эти сообщения? Есть ли что-то еще, что мне не хватает здесь, что может принести пользу архитектуре этой системы?
Ответы
Ответ 1
Здесь что-то тянет прямо из моего заднего - любое реальное решение будет очень сильно зависеть от вашего фактического использования, но здесь "Самая дешевая система очереди сообщений псевдонимов когда-либо":
Мысли/мотиваций:
- Преднамеренная экспозиция
IObservable<T>
, чтобы подписчики могли выполнять любую фильтрацию/кросс-подписок, которые они хотят
- Общая очередь не имеет значения, но
Register
и Publish
являются безопасными по типу (ish)
- YMMV с
Publish()
, где он находится, - попробуйте поэкспериментировать с его перемещением
- Обычно
Subject
- нет-нет, хотя в этом случае он делает для некоторого кода SIMPLE.
- Можно было бы "интернализировать" регистрацию, чтобы фактически сделать подписку, но тогда очередь должна была бы управлять созданным
IDisposables
- bah, пусть ваши клиенты справятся с этим!
Код:
public class TheCheapestPubSubEver
{
private Subject<object> _inner = new Subject<object>();
public IObservable<T> Register<T>()
{
return _inner.OfType<T>().Publish().RefCount();
}
public void Publish<T>(T message)
{
_inner.OnNext(message);
}
}
Использование:
void Main()
{
var queue = new TheCheapestPubSubEver();
var ofString = queue.Register<string>();
var ofInt = queue.Register<int>();
using(ofInt.Subscribe(i => Console.WriteLine("An int! {0}", i)))
using(ofString.Subscribe(s => Console.WriteLine("A string! {0}", s)))
{
queue.Publish("Foo");
queue.Publish(1);
Console.ReadLine();
}
}
Вывод:
A string! Foo
An int! 1
ОДНАКО, это не строго соблюдает "потребители потребления" - несколько регистров определенного типа приведут к нескольким вызовам наблюдателя - то есть:
var queue = new TheCheapestPubSubEver();
var ofString = queue.Register<string>();
var anotherOfString = queue.Register<string>();
var ofInt = queue.Register<int>();
using(ofInt.Subscribe(i => Console.WriteLine("An int! {0}", i)))
using(ofString.Subscribe(s => Console.WriteLine("A string! {0}", s)))
using(anotherOfString.Subscribe(s => Console.WriteLine("Another string! {0}", s)))
{
queue.Publish("Foo");
queue.Publish(1);
Console.ReadLine();
}
Результаты в:
A string! Foo
Another string! Foo
An int! 1
Ответ 2
Вот полный обработанный пример, протестированный в Visual Studio 2012.
- Создайте новое консольное приложение С#.
- Щелкните правой кнопкой мыши на своем проекте, выберите "Управление пакетами NuGet" и добавьте "Reactive Extensions - Main
Библиотека".
Добавьте этот код С#:
using System;
using System.Collections.Concurrent;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
namespace DemoRX
{
class Program
{
static void Main(string[] args)
{
BlockingCollection<string> myQueue = new BlockingCollection<string>();
{
IObservable<string> ob = myQueue.
GetConsumingEnumerable().
ToObservable(TaskPoolScheduler.Default);
ob.Subscribe(p =>
{
// This handler will get called whenever
// anything appears on myQueue in the future.
Console.Write("Consuming: {0}\n",p);
});
}
// Now, adding items to myQueue will trigger the item to be consumed
// in the predefined handler.
myQueue.Add("a");
myQueue.Add("b");
myQueue.Add("c");
Console.Write("[any key to exit]\n");
Console.ReadKey();
}
}
}
Вы увидите это на консоли:
[any key to exit]
Consuming: a
Consuming: b
Consuming: c
Самое приятное в использовании RX заключается в том, что вы можете использовать полную мощность LINQ для фильтрации любых нежелательных сообщений. Например, добавьте предложение .Where
для фильтрации по "a" и наблюдайте, что происходит:
ob.Where(o => (o == "a")).Subscribe(p =>
{
// This will get called whenever something appears on myQueue.
Console.Write("Consuming: {0}\n",p);
});
Философские заметки
Преимущество этого метода при запуске выделенного потока для опроса очереди заключается в том, что вам не нужно беспокоиться об утилизации потока должным образом после выхода программы. Это означает, что вам не нужно беспокоиться о IDisposable или CancellationToken (что всегда требуется при работе с BlockingCollection или ваша программа может зависать при выходе с потоком, который отказывается умереть).
Поверьте мне, это не так просто, как вы думаете, чтобы написать полностью надежный код, чтобы потреблять события, выходящие из BlockingCollection. Я предпочитаю использовать RX-метод, как показано выше, поскольку его более чистый, более надежный, имеет меньше кода, и вы можете фильтровать с помощью LINQ.
Задержка
Я был удивлен, насколько быстро этот метод.
На моем Xeon X5650 @2.67Ghz требуется 5 секунд для обработки 10 миллионов событий, которые работают примерно на 0,5 микросекунды за каждое событие. Потребовалось 4,5 секунды, чтобы поместить элементы в BlockingCollection, поэтому RX вынимал их и обрабатывал их почти так же быстро, как и они.
Threading
Во всех моих тестах RX только развернул один поток для обработки задач в очереди.
Это означает, что у нас очень хороший шаблон: мы можем использовать RX для сбора входящих данных из нескольких потоков, поместить их в общую очередь, затем обрабатывать содержимое очереди в одном потоке (который по определению является потокобезопасным).
Этот шаблон устраняет огромное количество головных болей при работе с многопоточным кодом, путем развязки производителя и потребителя данных через очередь, где производитель может быть многопоточным, а потребитель - однопоточным и, следовательно, потокобезопасным. Это концепция, которая делает Эрланг таким надежным. Для получения дополнительной информации об этом шаблоне см. Многопоточность сделана смехотворно простой.
Ответ 3
Я не использовал BlockingCollection
в этом контексте - поэтому я "предполагаю" - вы должны запустить его, чтобы одобрить, опровергнуть.
BlockingCollection
может только еще больше усложнить ситуацию (или оказать небольшую помощь). Взгляните на этот пост от Джона - просто чтобы подтвердить. GetConsumingEnumerable
предоставит вам "для каждого абонента". Исчерпав их в конечном итоге - что-то иметь в виду с Rx.
Кроме того, IEnumerable<>.ToObservable
дополнительно выравнивает "источник". Поскольку это работает (вы можете найти источник - я бы рекомендовал w/Rx больше всего на свете) - каждая подписка создает собственный "перечислитель" - так что все будут получать свои собственные версии фида. Я действительно не уверен, как это работает в сценарии Observable, как это.
В любом случае - если вы хотите предоставить общедоступные сообщения - IMO, вам нужно ввести Subject
или указать в какой-либо другой форме (например, опубликовать и т.д.). И в этом смысле я не думаю, что BlockingCollection поможет любому, но опять же, вам лучше всего попробовать себя.
Примечание (философский)
Если вы хотите комбинировать типы сообщений или комбинировать разные источники - например, в более "реальном мире" сценарии - он становится более сложным. И это довольно интересно, я должен сказать.
Следите за тем, чтобы их "укоренились" в однопользовательский поток (и избегайте того, что верно предложил Иер).
Я бы рекомендовал, чтобы вы не пытались уклониться, используя Subject
. За то, что вам нужно, ваш друг - независимо от всех обсуждений, связанных с не состоянием, и того, как субъект плох, - у вас фактически есть состояние (и вам нужно "состояние" ) - Rx начинает "после факта", поэтому вы пользуйтесь им независимо.
Я призываю вас пойти так, как мне нравится, как это получилось.
Ответ 4
Моя проблема в том, что мы превратили Queue (который я обычно ассоциирую с деструктивными чтениями одного потребителя, особенно если вы используете BlockingCollection) в трансляцию (отправляйте всем и всем слушателям прямо сейчас).
Это кажутся двумя противоречивыми идеями.
Я видел это, но он был выброшен, поскольку это было "правильное решение неправильного вопроса".