Избегание дублированных сообщений на JMS/ActiveMQ
Есть ли способ подавления дублированных сообщений в очереди, определенной на сервере ActiveMQ?
Я попытался определить JMSMessageID вручную (message.setJMSMessageID("uniqueid")), но сервер игнорирует эту модификацию и доставляет сообщение со встроенным сгенерированным JMSMessageID.
По спецификации я не нашел ссылки о том, как дедуплицировать сообщения.
В HornetQ, чтобы справиться с этой проблемой, нам нужно объявить специальное свойство HQ org.hornetq.core.message.impl.HDR_DUPLICATE_DETECTION_ID при определении сообщения.
то есть:
Message jmsMessage = session.createMessage();
String myUniqueID = "This is my unique id"; // Could use a UUID for this
message.setStringProperty(HDR_DUPLICATE_DETECTION_ID.toString(), myUniqueID);
Кто-нибудь знает, есть ли подобное решение для ActiveMQ?
Ответы
Ответ 1
Вы должны посмотреть на Apache Camel, он предоставит потребительский компонент Idempotent, который будет работать с любым поставщиком JMS, см.: http://camel.apache.org/idempotent-consumer.html
Используя это в сочетании с компонентом ActiveMQ, использование JMS довольно просто, см.
http://camel.apache.org/activemq.html
Ответ 2
Я сомневаюсь, что ActiveMQ поддерживает его изначально, но должно быть легко реализовать идемпотентного потребителя. Способ сделать это - добавить уникальный идентификатор для каждого сообщения на стороне производителя, теперь на конечной цели, используя хранилище (db, cache и т.д.), Можно проверить, будет ли сообщение получено до и продолжить обработку на основе этой проверки.
Я вижу предыдущий вопрос об стекировании в тех же строках - Apache ActiveMQ 5.3 - Как настроить очередь на отклонение повторяющихся сообщений?, которые также могут помочь.
Ответ 3
Теперь есть поддержка для удаления повторяющихся сообщений, запеченных в транспорте ActiveMQ. См. Значения конфигурации auditDepth
и auditMaximumProducerNumber
в Руководство по конфигурации подключения.
Ответ 4
Существует способ сделать ActiveMQ для фильтрации дубликатов на основе свойства JMS. это связано с написанием плагина Activemq . Базовый фильтр брокера, который отправляет повторяющиеся сообщения в очередь с ошибкой, будет таким:
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.broker.ProducerBrokerExchange;
public class DuplicateFilterBroker extends BrokerFilter {
String messagePropertyName;
boolean switchValue;
public DuplicateFilterBroker(Broker next, String messagePropertyName) {
super(next);
this.messagePropertyName = messagePropertyName;
}
public boolean hasDuplicate(String propertyValue){
switchValue = propertyValue;
return switchValue;
}
public void send(ProducerBrokerExchange producerExchange, Message msg) throws Exception {
ActiveMQMessage amqmsg = (ActiveMQMessage)msg;
Object msgObj = msg.getMessage();
if (msgObj instanceof javax.jms.Message) {
javax.jms.Message jmsMsg = (javax.jms.Message) msgObj;
if (!hasDuplicate(jmsMsg.getStringProperty(messagePropertyName))) {
super.send(producerExchange, msg);
}
else {
sendToDeadLetterQueue(producerExchange.getConnectionContext(), msg);
}
}
}
}
Ответ 5
Казалось бы, как это предлагается в вопросе, работает и для ActiveMQ (2016/12). См. Руководство activemq-artemis. Это требует от производителя установки определенного свойства в сообщение.
Message jmsMessage = session.createMessage();
String myUniqueID = "This is my unique id"; // Could use a UUID for this
message.setStringProperty(HDR_DUPLICATE_DETECTION_ID.toString(), myUniqueID);
Однако класс, содержащий свойство, отличается:
org.apache.activemq.artemis.core.message.impl.HDR_DUPLICATE_DETECTION_ID
, а значение свойства _AMQ_DUPL_ID
.