Ответ 1
Когда вам нужно прослушивать разные сокеты в том же потоке, используйте poller:
ZMQ.Socket subscriber = ctx.socket(ZMQ.SUB)
ZMQ.Socket puller = ctx.socket(ZMQ.PULL)
Зарегистрировать сокеты с poller (POLLIN
прослушивает входящие сообщения)
ZMQ.Poller poller = ZMQ.Poller(2)
poller.register(subscriber, ZMQ.Poller.POLLIN)
poller.register(puller, ZMQ.Poller.POLLIN)
При опросе используйте цикл:
while( notInterrupted()){
poller.poll()
//subscriber registered at index '0'
if( poller.pollin(0))
subscriber.recv(ZMQ.DONTWAIT)
//puller registered at index '1'
if( poller.pollin(1))
puller.recv( ZMQ.DONTWAIT)
}
Выберите, как вы хотите опросить...
poller.poll()
блокирует до тех пор, пока не будет данных в любом сокете. poller.poll(1000)
блокирует 1 сек, затем истекает время.
Полллер уведомляет, когда в сокетах имеются данные (сообщения); это ваша работа, чтобы прочитать его.
При чтении сделайте это без блокировки: socket.recv( ZMQ.DONTWAIT)
. Несмотря на то, что poller.pollin(0)
проверяет, есть ли данные для чтения, вы хотите избежать блокировки вызовов внутри цикла опроса, иначе вы могли бы заблокировать poller из-за "застрявшего" сокета.
Итак, если на subscriber
отправлено два отдельных сообщения, вы должны дважды вызвать subscriber.recv()
, чтобы очистить poller, в противном случае, если вы вызываете subscriber.recv()
один раз, poller будет сообщать вам там другое сообщение для чтения. Таким образом, по сути, poller отслеживает доступность и количество сообщений, а не фактические сообщения.
Вам следует запустить примеры опроса и поиграть с кодом, это лучший способ узнать.
Отвечает ли это на ваш вопрос?