Могу ли я отправлять сообщения с помощью пользовательского алгоритма вместо циклического использования с помощью RabbitMQ?
Я использую функцию Round Robin RabbitMQ для отправки сообщений между несколькими потребителями, но только один из них получает фактическое сообщение за раз.
Моя проблема в том, что мои сообщения представляют задачи, и я хотел бы иметь локальные сеансы (состояние) для моих потребителей. Я знаю заранее, какие сообщения относятся к какой сессии, но я не знаю, что является лучшим способом (или есть способ?), Чтобы сделать отправку RabbitMQ потребителям с использованием указанного мной алгоритма.
Я не хочу писать свою собственную службу оркестровки, потому что она станет узким местом, и я не хочу, чтобы мои продюсеры знали, какой потребитель будет принимать свои сообщения, потому что я потеряю развязку, которую я получаю с помощью Кролика.
Есть ли способ заставить RabbitMQ отправлять мои сообщения потребителям на основе заранее определенного алгоритма/правила вместо циклического robin?
Уточнение: Я использую несколько микросервисов, написанных на разных языках, и каждая служба имеет свою собственную работу. Я общаюсь между ними, используя сообщения protobuf. Я даю каждое новое сообщение a UUID
. Если потребитель получает сообщение, он может создать от него ответное сообщение (это может быть не правильная терминология, поскольку производители и потребители развязаны, и они не знают друг о друге), и этот UUID
копируется в новое сообщение, Это формирует конвейер преобразования данных, и этот "процесс" идентифицируется UUID
(processId). Моя проблема в том, что возможно, что у меня есть несколько пользователей-работников, и мне нужен рабочий придерживаться до UUID
, если он видел это раньше. У меня есть эта потребность, потому что
- может быть локальное состояние для каждого процесса
- После завершения процесса я хочу очистить локальное состояние
- Микросервис может получать несколько сообщений для одного и того же процесса, и мне нужно различать, какое сообщение принадлежит к какому процессу
Так как RabbitMQ распределяет задачи между рабочими, используя round robin, я не могу заставить мои процессы придерживаться рабочего. У меня есть несколько оговорок:
- Производители отделены от потребителей, поэтому прямой обмен сообщениями не является вариантом
- Число рабочих не является постоянным (существует балансировщик нагрузки, который может запускать новые экземпляры рабочего).
Если есть обходное решение, которое не связано с изменением алгоритма round robin и не нарушает мои ограничения, это также нормально!
Ответы
Ответ 1
Если вы не хотите использовать службу оркестровки, вы можете попробовать вместо этого такую топологию:
![введите описание изображения здесь]()
Для простоты я предполагаю, что ваш processId
используется как ключ маршрутизации (в реальном мире вы можете захотеть сохранить его в заголовке и вместо этого использовать обмен header
).
Входящее сообщение будет приниматься входящим сообщением (type: direct), которое имеет атрибут alternative-exchange
, чтобы указать на No Session Exchange
(разветвление).
Вот что говорит RabbitMQ в разделе "Альтернативные обмены":
Иногда желательно, чтобы клиенты обрабатывали сообщения, которые не удалось выполнить обмену (т.е. либо потому, что не было связанных очередей, у нас нет соответствующих привязок).
Типичными примерами этого являются
- обнаружение, когда клиенты случайно или злонамеренно публикуют сообщения, которые не могут быть перенаправлены
- "или иначе" семантика маршрутизации, где обрабатываются некоторые сообщения, а остальные - с помощью общего обработчика
Функция альтернативного обмена RabbitMQ ( "AE" ) разрешает эти варианты использования.
(нас особенно интересует случай использования or else
)
Каждый потребитель будет создавать свою собственную очередь и привязать ее к входящему Exchange, используя processId(s)
для сеанса (ов), о котором он знает до сих пор, в качестве ключа маршрутизации привязки.
Таким образом, он будет получать сообщения только для интересующих сессий.
Кроме того, все пользователи будут привязываться к общему сеансу без сеанса.
Если появится сообщение с ранее неизвестным processId
, для него не будет привязки к нему, зарегистрированного в Incoming Exchange, поэтому он будет перенаправлен на сеанс No Session Exchange = > No Session Queue и отправлен на один из Потребителей обычным (круговым) способом.
Затем потребитель зарегистрирует новую привязку для него с помощью входящего обмена (т.е. запустит новый "сеанс" ), чтобы затем он получал все последующие сообщения с помощью этого processId
.
После завершения "сеанса" он должен удалить соответствующую привязку (т.е. закрыть "сеанс" ).