ActiveMQ: как обращаться с отказоустойчивостью брокера при использовании временных очередей
В моих JMS-приложениях мы используем временные очереди для Продюсеров, чтобы получать ответы от потребительских приложений.
Я столкнулся с такой же проблемой на моем конце, как упоминалось в этом потоке: http://activemq.2283324.n4.nabble.com/jira-Created-AMQ-3336-Temporary-Destination-errors-on-H-A-failover-in-broker-network-with-Failover-tt-td3551034.html#a3612738
Всякий раз, когда я перезапускал произвольный брокер в своей сети, у меня возникало много ошибок, подобных этому в моем журнале приложений-приложениях при попытке отправить ответ на временную очередь:
javax.jms.InvalidDestinationException:
Cannot publish to a deleted Destination: temp-queue://ID:...
Затем я увидел ответ Гэри, предлагающий использовать
jms.watchTopicAdvisories=false
как параметр url на клиенте brokerURL
. С этим дополнительным параметром я быстро изменил URL-адреса клиентских брокеров. Однако теперь я вижу ошибки, подобные этому, когда я перезапускаю своих брокеров в сети для этого тестирования после сбоя:
javax.jms.JMSException:
The destination temp-queue:
//ID:client.host-65070-1308610734958-2:1:1 does not exist.
Я использую версию ActiveMQ 5.5. И мой URL-адрес брокера клиента выглядит следующим образом:
failover:(tcp://amq-host1:61616,tcp://amq-host2.tred.aol.com:61616,tcp://amq-host3:61616,tcp://amq-host4:61616)?jms.useAsyncSend=true&timeout=5000&jms.watchTopicAdvisories=false
Кроме того, здесь приведен мой XML файл activemq для одного из 4-х брокеров:
amq1.xml
Может кто-то здесь, пожалуйста, изучите эту проблему и предложите мне, какую ошибку я делаю в этой настройке.
Update:
Чтобы уточнить, как я выполняю запрос-ответ в своем коде:
- Я уже использую назначение для каждого производителя (т.е. временную очередь) и устанавливаю его в заголовке ответа на каждое сообщение.
- Я уже отправляю уникальный идентификатор корреляции для сообщения в заголовке JMSCorrelationID.
- Насколько я знаю, даже Camel и Spring также используют временную очередь для механизма запроса-ответа. Единственное отличие состоит в том, что реализация Spring JMS создает и уничтожает временную очередь для каждого сообщения, тогда как я создаю временную очередь для жизни продюсера. Эта временная очередь уничтожается при закрытии приложения клиента (продюсера) или брокером AMQ, когда он понимает, что с этой временной очередью нет активного производителя.
- Я уже устанавливаю истечение сообщения для каждого сообщения на стороне Продюсера, чтобы сообщение не задерживалось в очереди слишком долго (60 секунд).
Ответы
Ответ 1
Существует атрибут брокера, org.apache.activemq.broker.BrokerService # cacheTempDestinations, который должен помочь в отказе: case.
Установите для этого значение true в xml-конфигурации, и временный пункт назначения не будет удален сразу, когда клиент отключится.
Быстрый переход на другой ресурс: повторное подключение сможет продлить и/или потреблять из очереди temp снова.
Существует задача таймера, основанная на timeBeforePurgeTempDestinations (по умолчанию 5 секунд), которая обрабатывает удаление кеша.
Одно предупреждение, но я не вижу никаких тестов в activemq-core, которые используют этот атрибут, поэтому я не могу дать вам никаких гарантий на этом.
Ответ 2
Временные очереди создаются в брокере, к которому подключается запросчик (производитель) в сценарии запроса-ответа. Они создаются с помощью javax.jms.Session
, поэтому при отключении этого сеанса либо из-за отказа клиента, либо от отказа/отказа брокера эти очереди постоянно исчезают. Ни один из других брокеров не поймет, что имеется в виду, когда один из ваших потребителей пытается ответить на эти очереди; следовательно, ваше исключение.
Это требует архитектурного сдвига в мышлении, предполагая, что вы хотите справиться с отказоустойчивостью и сохранить все свои сообщения. Вот общий способ атаки на проблему:
- Ваши ответные заголовки должны ссылаться на очередь, относящуюся к процессу запроса: например.
queue:response.<client id>
. Идентификатор клиента может быть стандартным именем, если у вас ограниченное количество клиентов или UUID, если у вас их большое количество.
- Исходящее сообщение должно установить идентификатор корреляции (просто sting, который позволяет связать запрос с запросами ответа - после того, как все могут сделать более одного запроса одновременно). Это указано в заголовке
JMSCorrelationID
и должно быть скопировано из запроса в ответное сообщение.
- Проситель должен настроить слушателя в этой очереди, которая вернет тело сообщения в запрашивающий поток на основе этого идентификатора корреляции. Для этого нужно написать многопотоковый код, так как вам нужно вручную управлять чем-то вроде карты корреляционных идентификаторов с исходными потоками (возможно, через Futures).
Это аналогичный подход к использованию Apache Camel для запроса-ответа по обмену сообщениями.
Одна вещь, о которой следует помнить, заключается в том, что очередь не уходит, когда клиент делает, поэтому вы должны установить время для жизни в ответном сообщении, чтобы оно удалялось от брокера, если оно не было уничтожено, иначе вы получите отставание от непонятных сообщений. Вам также нужно будет настроить стратегию очереди с мертвой буквой, чтобы автоматически отменить истекшие сообщения.