Использование ZeroMQ вместе с Boost:: ASIO
У меня есть приложение на С++, которое использует ZeroMQ для некоторых сообщений. Но он также должен обеспечить соединение SGCI для веб-службы на основе AJAX/Comet.
Для этого мне нужен обычный TCP-сокет. Я мог бы сделать это с помощью обычных гнезд Posix, но оставайтесь на переносной платформе и упростите мою жизнь (надеюсь...) Я думал об использовании Boost:: ASIO.
Но теперь у меня есть столкновение ZMQ, которое хочет использовать его собственный zmq_poll()
и ASIO it io_service.run()
...
Есть ли способ заставить ASIO работать вместе с 0MQ zmq_poll()
?
Или есть другой рекомендуемый способ для достижения такой настройки?
Примечание. Я мог бы решить это, используя несколько потоков - но это всего лишь небольшой ядро /процессорный ящик, который будет запускать эту программу с очень низким количеством трафика SCGI, поэтому многопоточность будет пустой тратой ресурсов...
Ответы
Ответ 1
После прочтения документации здесь и здесь, в частности, этот пункт
ZMQ_FD: получить файловый дескриптор, связанный с сокетом. ZMQ_FD опция должна получить файловый дескриптор, связанный с указанный сокет. Дескриптор возвращаемого файла можно использовать для интегрировать сокет в существующий цикл событий; библиотека ØMQ должен сигнализировать о любых ожидающих событиях на сокете в срабатывании с краем путем создания файлового дескриптора для чтения.
Я думаю, вы можете использовать null_buffers
для каждого zmq_pollitem_t
и отложить цикл событий до io_service
, полностью обходя zmq_poll()
вообще. Однако в вышеупомянутой документации есть некоторые предостережения, особенно
Возможность чтения из дескриптора возвращенного файла не обязательно указывать, что сообщения доступны для чтения или может быть записано в базовый сокет; приложения должны извлекать фактическое состояние события с последующим извлечением ZMQ_EVENTS вариант.
Итак, когда обработчик одного из ваших гнезд zmq запущен, вам нужно будет сделать немного больше работы, прежде чем обрабатывать событие, которое, как я думаю. Некомпилированный псевдокод ниже
const int fd = getZmqDescriptorSomehow();
boost::asio::posix::stream_descriptor socket( _io_service, fd );
socket->async_read_some(
boost::asio::null_buffers(),
[=](const boost::system::error_code& error)
{
if (!error) {
// handle data ready to be read
}
}
);
обратите внимание, что вам не нужно использовать лямбда здесь, boost::bind
для функции-члена будет достаточно.
Ответ 2
В конце концов я понял, что есть два возможных решения:
- Сэм Миллер, где мы используем цикл событий ASIO
- Цикл событий ZeroMQ, получая дескрипторы файла ASIO, хотя методы
.native()
acceptor
и socket
и вставляя их в массив zmq_pollitem_t
Я согласился с ответом Сэма Миллера как на то, что для меня лучшее решение в случае SCGI, где постоянно создаются и заканчиваются постоянно новые соединения. Обработка каждого изменяющегося массива zmq_pollitem_t
- это большая проблема, которой можно избежать с помощью цикла событий ASIO.
Ответ 3
Получение сокета для ZeroMQ - самая маленькая часть битвы. ZeroMQ основан на протоколе который накладывается поверх TCP, поэтому вам придется повторно реализовать ZeroMQ в пользовательском Boost.Asio io_service, если вы идете по этому маршруту, Я столкнулся с той же проблемой при создании асинхронной службы ENet с использованием Boost.Asio, сначала попробовав поймать трафик от клиента ENet, используя Boost.Asio UDP. ENet - это протокол TCP, наложенный на UDP, поэтому все, что я достиг в этот момент, - это захват пакетов в практически бесполезном состоянии.
Boost.Asio основан на шаблонах, а встроенные шаблоны io_service используют для обертывания библиотеки системных сокетов для создания служб TCP и UDP. Моим окончательным решением было создать пользовательский io_service, который обернул библиотеку ENet, а не библиотеку системных сокетов, позволяя ей использовать транспортные функции ENet, вместо того, чтобы переопределять их с помощью встроенного транспорта UDP.
То же самое можно сделать для ZeroMQ, но ZeroMQ уже является очень высокопроизводительной сетевой библиотекой, которая сама по себе уже предоставляет асинхронный ввод-вывод. Я думаю, вы можете создать жизнеспособное решение, получая сообщения с использованием существующего API ZeroMQ и передавая сообщения в пул потоков io_service. Таким образом, сообщения/задачи будут по-прежнему обрабатываться асинхронно с использованием диаграммы Boost.Asio без необходимости перезаписывать что-либо. ZeroMQ предоставит асинхронный ввод-вывод, Boost.Asio предоставит обработчики/рабочие задачи async.
Существующий io_service все еще может быть подключен к существующему сокету TCP, позволяя threadpool обрабатывать как TCP (HTTP в вашем случае), так и ZeroMQ. Это вполне возможно в такой настройке для обработчиков задач ZeroMQ для доступа к объектам сеанса служб TCP, что позволяет отправлять результаты сообщения/задачи ZeroMQ обратно на клиент TCP.
Ниже приведена только иллюстрация концепции.
// Create a pool of threads to run all of the io_services.
std::vector<boost::shared_ptr<boost::thread> > threads;
for(std::size_t i = 0; i < thread_pool_size_; ++i) {
boost::shared_ptr<boost::thread> thread(new boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_)));
threads.push_back(thread);
}
while (1) {
char buffer [10];
zmq_recv (responder_, buffer, 10, 0);
io_service_.post(boost::bind(&server::handle_zeromq_message, buffer, this));
}
Ответ 4
Решением является также опрос вашего io_service, а не run().
Просмотрите это решение для некоторого poll() info.
Использование опроса вместо запуска позволит вам опросить zmq-соединения без каких-либо проблем с блокировкой.