RabbitMQ: постоянное сообщение с обменом темой
Я очень новичок в RabbitMQ.
Я установил обмен темой. Потребители могут быть запущены после издателя. Я бы хотел, чтобы потребители могли получать сообщения, которые были отправлены до того, как они появились, и которые еще не были использованы.
Обмен настроен со следующими параметрами:
exchange_type => 'topic'
durable => 1
auto_delete => 0
passive => 0
Сообщения публикуются с помощью этого параметра:
delivery_mode => 2
Потребители используют get() для извлечения сообщений из обмена.
К сожалению, любое сообщение, опубликованное до того, как какой-либо клиент был потерян, потерян. Я использовал разные комбинации.
Я думаю, моя проблема в том, что обмен не содержит сообщений. Возможно, мне нужно иметь очередь между издателем и очередью. Но это, похоже, не работает с обменом "темами", где сообщения направляются с помощью ключа.
Любая идея, как я должен действовать. Я использую Perl-привязку Net:: RabbitMQ (не имеет значения) и RabbitMQ 2.2.0.
Ответы
Ответ 1
Вам нужна прочная очередь для хранения сообщений, если нет подключенных потребителей, доступных для обработки сообщений в момент их публикации.
Обмен не сохраняет сообщения, но может быть в очереди. Запутанная часть заключается в том, что обмен может быть отмечен как "долговечный", но все, что на самом деле означает, что сам обмен будет по-прежнему присутствовать, если вы перезапустите своего брокера, но он не означает, что любые сообщения, отправленные что обмен автоматически сохраняется.
Учитывая это, вот два варианта:
- Выполните административный шаг, прежде чем запускать издатели для создания очереди (ов) самостоятельно. Для этого вы можете использовать веб-интерфейс или инструменты командной строки. Убедитесь, что вы создали его как прочную очередь, чтобы он сохранял любые сообщения, которые были перенаправлены на него, даже если активных пользователей нет.
- Предполагая, что ваши потребители закодированы, чтобы всегда объявлять (и, следовательно, автоматически создавать) свои обмены и очереди при запуске (и объявлять их как долговечные), просто запустить всех своих потребителей хотя бы один раз до начиная с любых издателей. Это обеспечит правильное создание всех ваших очередей. Затем вы можете отключить потребителей, пока они действительно не понадобятся, потому что очереди будут постоянно хранить любые будущие сообщения, направленные им.
Я бы пошел на # 1. Не может быть много шагов для выполнения, и вы всегда можете script выполнить необходимые шаги, чтобы их можно было повторить. Плюс, если все ваши потребители будут тянуть из одной и той же очереди (а не иметь выделенную очередь), это действительно минимальная часть административных накладных расходов.
Очереди - это то, что нужно контролировать и контролировать. В противном случае вы можете закончить тем, что потребители-изгои объявляют прочные очереди, используя их в течение нескольких минут, но никогда больше. Вскоре после того, как у вас будет постоянно растущая очередь, ничто не уменьшит ее размер и надвигающийся апокалипсис брокера.
Ответ 2
Как упоминал Брайан, обмен не хранит сообщения и в основном отвечает за маршрутизацию сообщений на другой обмен /s или queue/s. Если обмен не связан с очередью, все сообщения, отправленные на этот обмен, будут "потеряны"
Вам не нужно объявлять постоянные клиентские очереди в издателе script, так как это может не быть масштабируемым. Очереди могут динамически создаваться вашими издателями и маршрутизироваться внутри, используя привязку обмена к обмену.
RabbitMQ поддерживает привязки обмена к обмену, которые позволят гибкость, развязку топологии и другие преимущества. Вы можете прочитать здесь RabbitMQ Exchange для привязки Exchange [AMPQ]
RabbitMQ Exchange для обмена ссылками
![Example Topology]()
Пример кода Python для создания привязки обмена к обмену с персистентностью, если потребитель не присутствует с использованием очереди.
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
#Declares the entry exchange to be used by all producers to send messages. Could be external producers as well
channel.exchange_declare(exchange='data_gateway',
exchange_type='fanout',
durable=True,
auto_delete=False)
#Declares the processing exchange to be used.Routes messages to various queues. For internal use only
channel.exchange_declare(exchange='data_distributor',
exchange_type='topic',
durable=True,
auto_delete=False)
#Binds the external/producer facing exchange to the internal exchange
channel.exchange_bind(destination='data_distributor',source='data_gateway')
##Create Durable Queues binded to the data_distributor exchange
channel.queue_declare(queue='trade_db',durable=True)
channel.queue_declare(queue='trade_stream_service',durable=True)
channel.queue_declare(queue='ticker_db',durable=True)
channel.queue_declare(queue='ticker_stream_service',durable=True)
channel.queue_declare(queue='orderbook_db',durable=True)
channel.queue_declare(queue='orderbook_stream_service',durable=True)
#Bind queues to exchanges and correct routing key. Allows for messages to be saved when no consumer is present
channel.queue_bind(queue='orderbook_db',exchange='data_distributor',routing_key='*.*.orderbook')
channel.queue_bind(queue='orderbook_stream_service',exchange='data_distributor',routing_key='*.*.orderbook')
channel.queue_bind(queue='ticker_db',exchange='data_distributor',routing_key='*.*.ticker')
channel.queue_bind(queue='ticker_stream_service',exchange='data_distributor',routing_key='*.*.ticker')
channel.queue_bind(queue='trade_db',exchange='data_distributor',routing_key='*.*.trade')
channel.queue_bind(queue='trade_stream_service',exchange='data_distributor',routing_key='*.*.trade')