0MQ: Как использовать ZeroMQ в потоковом режиме?
Я прочитал руководство ZeroMq, и я наткнулся на следующее:
Вы НЕ ДОЛЖНЫ разделять сокеты ØMQ между потоки. Соединители ØMQ не поточно. Технически это возможно для этого, но он требует семафоров, замки или мьютексы. Это сделает ваш приложение медленное и хрупкое. Единственный место, где он удаленно разделять сокеты между потоками в языковые привязки, которые необходимо выполнить магия, как сбор мусора на розетки.
а затем:
Помните: не используйте и не закрывайте сокеты, кроме потока, который их создал.
Я также понял, что ZeroMQ Context
является потокобезопасным.
Если класс регистрируется для события другого класса, в .Net это событие может быть вызвано из другого потока, кроме потока, в котором был создан прослушиватель.
Я думаю, что есть только два варианта, чтобы иметь возможность отправлять что-то через ZeroMQ-Sockets из обработчика событий:
- Синхронизировать thread-invoking-thread-поток с потоком, созданным в ZeroMQ-
Socket
Создайте новый ZeroMQ-
Socket
/получите существующий ZeroMQ-
Socket
для потока в обработчике событий, используя threadafe ZeroMQ-
Context
Кажется, что 0MQ-Guide препятствует первому, и я не думаю, что создание нового ZeroMq-Socket для каждого потока является исполнительным/способ перехода.
Мой вопрос:
Каков правильный шаблон (как это должно быть) для публикации сообщений через 0MQ из обработчика событий?
Кроме того, у авторов руководства есть ZeroMQ-Binding для .Net в виду, когда они пишут:
Единственный место, где он удаленно разделять сокеты между потоками в языковые привязки, которые необходимо выполнить магия, как сбор мусора на Розетки.
Вот пример кода, чтобы подчеркнуть мою проблему/вопрос:
public class ExampleClass
{
public event EventHandler<ByteEventArgs> SomethinIsCalledFromAnotherThread;
}
public class ByteEventArgs : EventArgs
{
public byte[] BytesToSend;
}
public class Dispatcher
{
ZMQ.Context ctx;
public Dispatcher(ZMQ.Context mqcontext, ExampleClass exampleClassInstance)
{
this.ctx = mqcontext;
exampleClassInstance.SomethinIsCalledFromAnotherThread += new EventHandler<ByteEventArgs>(exampleClass_SomethinIsCalledFromAnotherThread);
}
void exampleClass_SomethinIsCalledFromAnotherThread(object sender, ByteEventArgs e)
{
// this method might be called by a different thread. So I have to get a new socket etc?
using (var socket = ctx.Socket(ZMQ.SocketType.PUSH))
{
// init socket etc..... and finally:
socket.Send(e.BytesToSend);
}
// isn't that too much overhead?
}
}
Ответы
Ответ 1
В .net framework v4 и выше вы можете использовать параллельную коллекцию для решения этой проблемы. А именно модель "Продюсер-потребитель". Несколько потоков (обработчики) могут отправлять данные в потокобезопасную очередь, и только один поток потребляет данные из очереди и отправляет их с помощью сокета.
Вот идея:
sendQueue = new BlockingCollection<MyStuff>(new ConcurrentQueue<MyStuff>());
// concurrent queue can accept from multiple threads/handlers safely
MyHandler += (MyStuff stuffToSend) => sendQueue.Add(stuffToSend);
// start single-threaded data send loop
Task.Factory.StartNew(() => {
using(var socket = context.Socket()) {
MyStuff stuffToSend;
// this enumerable will be blocking until CompleteAdding is called
foreach(var stuff in sendQueue.GetConsumingEnumerable())
socket.Send(stuff.Serialize());
}
});
// break out of the send loop when done
OnMyAppExit += sendQueue.CompleteAdding;
Ответ 2
Вы можете создать много сокетов 0MQ, конечно же, сколько у вас есть потоки. Если вы создаете сокет в одном потоке и используете его в другом, вы должны выполнить полный барьер памяти между двумя операциями. Все остальное приведет к странным случайным сбоям в libzmq, поскольку объекты сокетов не являются потокобезопасными.
Существует несколько обычных шаблонов, хотя я не знаю, как они относятся к .NET:
- Создайте сокеты в потоках, которые используют их, период. Разделяйте контексты между потоками, тесно связанными с одним процессом, и создавайте отдельное содержимое в потоках, которые не связаны жестко. В API высокого уровня C (czmq) они называются прикрепленными и отдельными потоками.
- Создайте сокет в родительском потоке и передайте время создания потока в прикрепленный поток. При вызове создания потока будет выполняться полный барьер памяти. С этого момента используйте сокет только в дочернем потоке. "use" означает recv, send, setsockopt, getsockopt и close.
- Создайте сокет в одном потоке и используйте в другом, выполняя свой собственный полный барьер памяти между каждым использованием. Это очень деликатно, и если вы не знаете, что такое "барьер полной памяти", вы не должны этого делать.
Ответ 3
Не забудьте взглянуть на транспорт inproc. Возможно, было бы полезно использовать inproc://сокеты для interthread связи и иметь один поток, который открывает сокеты для общения с другими процессами/серверами.
Вам по-прежнему нужен хотя бы один сокет в потоке, но inproc файлы вообще не связаны с сетевым уровнем IP.