0MQ: Как использовать ZeroMQ в потоковом режиме?

Я прочитал руководство ZeroMq, и я наткнулся на следующее:

Вы НЕ ДОЛЖНЫ разделять сокеты ØMQ между потоки. Соединители ØMQ не поточно. Технически это возможно для этого, но он требует семафоров, замки или мьютексы. Это сделает ваш приложение медленное и хрупкое. Единственный место, где он удаленно разделять сокеты между потоками в языковые привязки, которые необходимо выполнить магия, как сбор мусора на розетки.

а затем:

Помните: не используйте и не закрывайте сокеты, кроме потока, который их создал.

Я также понял, что ZeroMQ Context является потокобезопасным.

Если класс регистрируется для события другого класса, в .Net это событие может быть вызвано из другого потока, кроме потока, в котором был создан прослушиватель.

Я думаю, что есть только два варианта, чтобы иметь возможность отправлять что-то через ZeroMQ-Sockets из обработчика событий:

  • Синхронизировать thread-invoking-thread-поток с потоком, созданным в ZeroMQ-
Socket Создайте новый ZeroMQ- Socket/получите существующий ZeroMQ- Socket для потока в обработчике событий, используя threadafe ZeroMQ- Context

Кажется, что 0MQ-Guide препятствует первому, и я не думаю, что создание нового ZeroMq-Socket для каждого потока является исполнительным/способ перехода.

Мой вопрос:
Каков правильный шаблон (как это должно быть) для публикации сообщений через 0MQ из обработчика событий?

Кроме того, у авторов руководства есть ZeroMQ-Binding для .Net в виду, когда они пишут:

Единственный место, где он удаленно разделять сокеты между потоками в языковые привязки, которые необходимо выполнить магия, как сбор мусора на Розетки.

Вот пример кода, чтобы подчеркнуть мою проблему/вопрос:

public class ExampleClass
{
    public event EventHandler<ByteEventArgs> SomethinIsCalledFromAnotherThread;
}

public class ByteEventArgs : EventArgs
{
    public byte[] BytesToSend;
}


public class Dispatcher
{
    ZMQ.Context ctx;

    public Dispatcher(ZMQ.Context mqcontext, ExampleClass exampleClassInstance)
    {
        this.ctx = mqcontext;
        exampleClassInstance.SomethinIsCalledFromAnotherThread += new EventHandler<ByteEventArgs>(exampleClass_SomethinIsCalledFromAnotherThread);
    }

    void exampleClass_SomethinIsCalledFromAnotherThread(object sender, ByteEventArgs e)
    {
        // this method might be called by a different thread. So I have to get a new socket etc?
        using (var socket = ctx.Socket(ZMQ.SocketType.PUSH))
        {
            // init socket etc..... and finally: 
            socket.Send(e.BytesToSend);
        }
        // isn't that too much overhead?
    }
}

Ответы

Ответ 1

В .net framework v4 и выше вы можете использовать параллельную коллекцию для решения этой проблемы. А именно модель "Продюсер-потребитель". Несколько потоков (обработчики) могут отправлять данные в потокобезопасную очередь, и только один поток потребляет данные из очереди и отправляет их с помощью сокета.

Вот идея:

sendQueue = new BlockingCollection<MyStuff>(new ConcurrentQueue<MyStuff>());
// concurrent queue can accept from multiple threads/handlers safely
MyHandler += (MyStuff stuffToSend) => sendQueue.Add(stuffToSend);

// start single-threaded data send loop
Task.Factory.StartNew(() => {
    using(var socket = context.Socket()) {
        MyStuff stuffToSend;
        // this enumerable will be blocking until CompleteAdding is called
        foreach(var stuff in sendQueue.GetConsumingEnumerable())
            socket.Send(stuff.Serialize());
    }
});

// break out of the send loop when done
OnMyAppExit += sendQueue.CompleteAdding;

Ответ 2

Вы можете создать много сокетов 0MQ, конечно же, сколько у вас есть потоки. Если вы создаете сокет в одном потоке и используете его в другом, вы должны выполнить полный барьер памяти между двумя операциями. Все остальное приведет к странным случайным сбоям в libzmq, поскольку объекты сокетов не являются потокобезопасными.

Существует несколько обычных шаблонов, хотя я не знаю, как они относятся к .NET:

  • Создайте сокеты в потоках, которые используют их, период. Разделяйте контексты между потоками, тесно связанными с одним процессом, и создавайте отдельное содержимое в потоках, которые не связаны жестко. В API высокого уровня C (czmq) они называются прикрепленными и отдельными потоками.
  • Создайте сокет в родительском потоке и передайте время создания потока в прикрепленный поток. При вызове создания потока будет выполняться полный барьер памяти. С этого момента используйте сокет только в дочернем потоке. "use" означает recv, send, setsockopt, getsockopt и close.
  • Создайте сокет в одном потоке и используйте в другом, выполняя свой собственный полный барьер памяти между каждым использованием. Это очень деликатно, и если вы не знаете, что такое "барьер полной памяти", вы не должны этого делать.

Ответ 3

Не забудьте взглянуть на транспорт inproc. Возможно, было бы полезно использовать inproc://сокеты для interthread связи и иметь один поток, который открывает сокеты для общения с другими процессами/серверами.

Вам по-прежнему нужен хотя бы один сокет в потоке, но inproc файлы вообще не связаны с сетевым уровнем IP.