Получить фильтр подписчика из гнезда ZMQ PUB
Я заметил в разделе "Вопросы и ответы" в разделе " Мониторинг", что невозможно получить список подключенных одноранговых узлов или получать уведомления, когда одноранговые соединения соединяются/разъединяются.
Означает ли это, что также невозможно узнать, какие темы знает сокет PUB/XPUB, который он должен опубликовать, исходя из обратной связи с обратной связью? Или есть какой-то способ доступа к этим данным?
Я знаю, что ZMQ> = 3.0 " поддерживает фильтрацию PUB/SUB у издателя ", но я действительно хочу фильтровать свой код приложения, используя знания ZMQ о том, на какие темы подписаны.
В моем случае я хочу опубликовать информацию о статусе робота. Некоторые темы включают основные аппаратные действия, такие как переключение линий выбора на АЦП для чтения значений IR.
У меня есть поток издателя, который работает на боте, который должен делать только это "чтение", чтобы получить ИК-данные, когда есть фактически подписчики. Однако, поскольку я могу только кормить строку в моем pub_sock.send, мне всегда приходится выполнять дорогостоящую операцию, даже если ZMQ собирается отказаться от этого сообщения, когда нет подписчиков.
У меня есть реализация, которая использует backcannel REQ/REP сокет для отправки информации о теме, которую мое приложение может проверить в своем цикле публикации, тем самым только собирая данные, которые необходимо собрать. Это кажется очень неэлегантным, поскольку, поскольку ZMQ уже должен иметь нужные мне данные, о чем свидетельствует его фильтрация у издателя.
Я заметил, что в этом сообщении списка рассылки OP, похоже, может видеть сообщения подписки, отправляемые в сокет XPUB.
Тем не менее, нет упоминания о том, как они это сделали, и я не вижу таких способностей в документах (по-прежнему глядя). Возможно, они просто использовали Wireshark (чтобы увидеть подписку на восходящий канал в разъем XPUB).
Ответы
Ответ 1
Используя zmq.XPUB
сокета zmq.XPUB
, есть способ обнаружить новых и уходящих абонентов. Следующий пример кода показывает, как:
# Publisher side
import zmq
ctx = zmq.Context.instance()
xpub_socket = ctx.socket(zmq.XPUB)
xpub_socket.bind("tcp://*:%d" % port_nr)
poller = zmq.Poller()
poller.register(xpub_socket)
events = dict(poller.poll(1000))
if xpub_socket in events:
msg = xpub_socket.recv()
if msg[0] == b'\x01':
topic = msg[1:]
print "Topic '%s': new subscriber" % topic
elif msg[0] == b'\x00':
topic = msg[1:]
print "Topic '%s': subscriber left" % topic
Обратите внимание, что zmq.XSUB
сокета zmq.XSUB
не подписывается так же, как "нормальный" zmq.SUB
. Пример кода:
# Subscriber side
import zmq
ctx = zmq.Context.instance()
# Subscribing of zmq.SUB socket
sub_socket = ctx.socket(zmq.SUB)
sub_socket.setsockopt(zmq.SUBSCRIBE, "sometopic") # OK
sub_socket.connect("tcp://localhost:%d" % port_nr)
# Subscribing zmq.XSUB socket
xsub_socket = ctx.socket(zmq.XSUB)
xsub_socket.connect("tcp://localhost:%d" % port_nr)
# xsub_socket.setsockopt(zmq.SUBSCRIBE, "sometopic") # NOK, raises zmq.error.ZMQError: Invalid argument
xsub_socket.send_multipart([b'\x01', b'sometopic']) # OK, triggers the subscribe event on the publisher
Я также хотел бы указать zmq.XPUB_VERBOSE
сокета zmq.XPUB_VERBOSE
. Если установлено, все события подписки принимаются в сокете. Если они не установлены, дублируются подписки. См. Также следующее сообщение: ZMQ: нет подписного сообщения в гнезде XPUB для нескольких подписчиков (шаблон кэша последнего значения)
Ответ 2
По крайней мере, для случая сокета XPUB/XSUB вы можете сохранить состояние подписки путем пересылки и обработки пакетов вручную:
context = zmq.Context()
xsub_socket = context.socket(zmq.XSUB)
xsub_socket.bind('tcp://*:10000')
xpub_socket = context.socket(zmq.XPUB)
xpub_socket.bind('tcp://*:10001')
poller = zmq.Poller()
poller.register(xpub_socket, zmq.POLLIN)
poller.register(xsub_socket, zmq.POLLIN)
while True:
try:
events = dict(poller.poll(1000))
except KeyboardInterrupt:
break
if xpub_socket in events:
message = xpub_socket.recv_multipart()
# HERE goes some subscription handle code which inspects
# message
xsub_socket.send_multipart(message)
if xsub_socket in events:
message = xsub_socket.recv_multipart()
xpub_socket.send_multipart(message)
(это код Python, но я предполагаю, что C/C++ выглядит очень похожим)
В настоящее время я работаю над этой темой, и я как можно скорее добавлю дополнительную информацию.