Примеры С++ и Python ZeroMQ 4.x PUB/SUB не работают
Я могу найти только старые исходные примеры на С++. В любом случае, я сделал свое, основанное на них. Здесь мой издатель в python:
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5563")
while True:
msg = "hello"
socket.send_string(msg)
print("sent "+ msg)
sleep(5)
А вот абонент в С++:
void * ctx = zmq_ctx_new();
void * subscriber = zmq_socket(ctx, ZMQ_SUB);
// zmq_connect(subscriber, "tcp://*:5563");
zmq_connect(subscriber, "tcp://localhost:5563");
// zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", sizeof(""));
while (true) {
zmq_msg_t msg;
int rc;
rc = zmq_msg_init( & msg);
assert(rc == 0);
std::cout << "waiting for message..." << std::endl;
rc = zmq_msg_recv( & msg, subscriber, 0);
assert(rc == 1);
std::cout << "received: " << (char * ) zmq_msg_data( & msg) << std::endl;
zmq_msg_close( & msg);
}
Изначально я пробовал zmq_setsockopt( subscriber, ZMQ_SUBSCRIBE, "", sizeof("") );
, но я предполагаю, что должен получить все, если я не установлю это, не так ли? Поэтому я прокомментировал это.
Когда я запускаю код, я вижу "ожидание сообщения..." навсегда.
Я попытался прослушать TCP-трафик с помощью tcpdump
. Оказывается, что когда издатель включен, я вижу много мусора в порту 5563
, и когда я выключу издателя, они останавливаются. Когда я попробовал схему PUSH/PULL
, я мог видеть сообщение открытого текста в tcpdump
. (Я попытался нажать с nodejs и потянул с помощью С++, и он сработал).
Что я могу делать неправильно?
Я пробовал разные комбинации .bind()
, .connect()
, localhost
, 127.0.0.1
, но они тоже не будут работать.
UPDATE: я только что прочитал, что должен подписаться на что-то, поэтому я сделал zmq_setsockopt( subscriber, ZMQ_SUBSCRIBE, NULL, 0 );
, чтобы подписаться на все, но я все еще ничего не получил
PyZMQ находится в версии 17.0.0.b3 и имеет ZeroMQ 4.2.3
С++ имеет ZeroMQ 4.2.2
ОБНОВЛЕНИЕ 2:
Обновляется как до 4.2.3, так и не работает.
Ответы
Ответ 1
это я, тот, который задал вопрос.
Мне удается работать с обменом socket.bind("tcp://*:5563")
на socket.connect("tcp://dns_address_of_my_dcker_container:5564")
в python,
и обменивая zmq_connect(subscriber, "tcp://localhost:5563")
на zmq_bind(subscriber, "tcp://*:5563")
на С++
Примеры, которые я нашел в Интернете, говорят, что я должен использовать bind
для издателя и connect
для подписчика, но это никоим образом не будет работать для меня. У кого-нибудь есть идея, почему?
Документация ZeroMQ гласит следующее:
Функция zmq_bind() связывает сокет с локальной конечной точкой, а затем принимает входящие соединения на этой конечной точке.
Функция zmq_connect() соединяет сокет с конечной точкой, а затем принимает входящие соединения на этой конечной точке.
У меня нет четкого представления о том, что изменилось, но оно сработало.
Ответ 2
"Думаю, я должен получить все, если я не установлю это, верно?"
Нет, это неверное предположение. Вам может понравиться коллекция моих других сообщений ZeroMQ здесь, о {plain-string | unicode | serialization}, а {performance-| трафик-} -изменяет фактическую политику ( SUB
файловую обработку фильтра темы в ранних версиях ZeroMQ и/или PUB
-проходную обработку для более поздних ) можно встретить в дизайне гетерогенных распределенных систем, используя ZeroMQ.
(Любой другой шаблон Scheable Formal Communication Archetype, как и наблюдаемый PUSH/PULL
, ничего не делает с политикой подписки, поэтому будет работать независимо от обработки сопоставления подписки с заданным набором фильтров тем.)к югу >
Шаг 0: сначала проверьте RTO-часть отправки, если она .send()
-s вообще:
Пусть макет быстрого питонического приемника, чтобы увидеть, если отправитель действительно отправляет что-то по полосе:
import zmq
aContext = zmq.Context() # .new Context
aSUB = aContext.socket( zmq.SUB ) # .new Socket
aSUB.connect( "tcp://127.0.0.1:5563" ) # .connect
aSUB.setsockopt( zmq.LINGER, 0 ) # .set ALWAYS!
aSUB.setsockopt( zmq.SUBSCRIBE, "" ) # .set T-filter
MASK = "INF: .recv()-ed this:[{0:}]\n: waited {1: > 7d} [us]"
aClk = zmq.Stopwatch();
while True:
try:
aClk.start(); print MASK.format( aSUB.recv(),
aClk.stop()
)
except ( KeyboardInterrupt, SystemExit ):
pass
break
pass
aSUB.close() # .close ALWAYS!
aContext.term() # .term ALWAYS!
Об этом следует сообщить независимо от того, что PUB
-sender на самом деле .send()
на провод, а также фактическое время взаимодействия между сообщениями (в [us]
, рад, что ZeroMQ включил этот инструмент для отладки и настройки производительности/латентности).
Если ACK-ed, когда вы видите текущие INF:
-массы, на самом деле тикирующие на экране, продолжайте работать, и теперь имеет смысл перейти к следующему шагу.
Шаг 1: Проверьте следующий код получающей части:
#include <zmq.h>
void *aContext = zmq_ctx_new();
void *aSUB = zmq_socket( aContext, ZMQ_SUB ); std::cout << "INF: .. zmq_ctx_new() done" << std::endl;
zmq_connect( aSUB, "tcp://127.0.0.1:5563" ); std::cout << "INF: .. zmq_connect() done" << std::endl;
zmq_setsockopt( aSUB, ZMQ_SUBSCRIBE, "", 0 ); std::cout << "INF: .. zmq_setsockopt( ZMQ_SUBSCRIBE, ... ) done" << std::endl;
zmq_setsockopt( aSUB, ZMQ_LINGER, 0 ); std::cout << "INF: .. zmq_setsockopt( ZMQ_LINGER, ... ) done" << std::endl;
int rc;
while (true) {
zmq_msg_t msg; /* Create an empty ØMQ message */
rc = zmq_msg_init (&msg); assert (rc == 0 && "EXC: in zmq_msg_init() call" );
std::cout << "INF: .. zmq_msg_init() done" << std::endl;
rc = zmq_msg_recv (&msg, aSUB, 0); assert (rc != -1 && "EXC: in zmq_msg_recv() call" );
std::cout << "INF: .. zmq_msg_recv() done: received [" << (char * ) zmq_msg_data( &msg ) << "]" << std::endl;
zmq_msg_close (&msg); /* Release message */
std::cout << "INF: .. zmq_msg_close()'d" << std::endl;
}
zmq_close( aSUB ); std::cout << "INF: .. aSUB was zmq_close()'d" << std::endl;
zmq_ctx_term( aContext ); std::cout << "INF: .. aCTX was zmq_ctx_term()'d" << std::endl;
Ответ 3
Каково возвращаемое значение для zmq_setsockopt()
?
Затем вы должны использовать ""
вместо NULL
, они разные.
zmq_setsockopt( subscriber, ZMQ_SUBSCRIBE, "", 0 );
Как определяет API:
Возвращаемое значение
Функция zmq_setsockopt()
должна возвращать ноль в случае успеха. В противном случае он должен вернуть -1 и установить errno
в одно из значений, указанных ниже.
...
Ответ 4
Правильный рецепт запуска шаблона PUB/SUB (независимо от языка):
Паб
-
socket(zmq.PUB)
-
bind("tcp://127.0.0.1:5555")
- (обычно просто кодировать() для строк, но вы также можете сжимать() или dumps() для объектов или даже для обоих)
encoded_topic = topic.encode()
encoded_msg = msg.encode()
-
send_multipart([encoded_topic, encoded_msg])
Sub
-
socket(zmq.SUB)
-
setsockopt(zmq.SUBSCRIBE, topic.encode())
-
connect("tcp://127.0.0.1:5555")
-
answer = recv_multipart()
- декодировать ответ
enc_topic, enc_msg = answer
topic = enc_topic.decode()
msg = enc_msg.decode()
В общем, шаги Pub - 2/Sub - 3 (т.е. bind/connect) и Pub - 3/Sub -
5 (то есть кодирование/декодирование или дампы/нагрузки) должны быть взаимодополняющими друг друга, чтобы работа могла работать.