Сигнал откат от JMS MessageListener
Я работаю с JMS и ActiveMQ. Все творит чудеса. Я не использую spring, и я не могу.
Интерфейс javax.jms.MessageListener
имеет только один метод, onMessage
. В рамках реализации есть шанс, что будет выбрано исключение. Если на самом деле возникает исключение, то я говорю, что сообщение не обработано должным образом, и его необходимо повторить. Итак, мне нужно ActiveMQ подождать немного, а затем повторить попытку. т.е. мне нужно исключение для отката транзакции JMS.
Как я могу выполнить такое поведение?
Возможно, в ActiveMQ есть некоторая конфигурация, которую я не смог найти.
Или... может быть, может покончить с регистрацией MessageListener
для потребителей и самостоятельно использовать сообщения в виде цикла, например:
while (true) {
// ... some administrative stuff like ...
session = connection.createSesstion(true, SESSION_TRANSACTED)
try {
Message m = receiver.receive(queue, 1000L);
theMessageListener.onMessage(m);
session.commit();
} catch (Exception e) {
session.rollback();
Thread.sleep(someTimeDefinedSomewhereElse);
}
// ... some more administrative stuff
}
в паре потоков, вместо регистрации слушателя.
Или... Я мог бы как-то украсить/AOP/byte-манипулировать MessageListener
, чтобы сделать это самостоятельно.
Какой маршрут вы возьмете и почему?
note: у меня нет полного контроля над кодом MessageListener
.
ИЗМЕНИТЬ
Тест для доказательства концепции:
@Test
@Ignore("Interactive test, just a proof of concept")
public void transaccionConListener() throws Exception {
final AtomicInteger atomicInteger = new AtomicInteger(0);
BrokerService brokerService = new BrokerService();
String bindAddress = "vm://localhost";
brokerService.addConnector(bindAddress);
brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter());
brokerService.setUseJmx(false);
brokerService.start();
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress);
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(500);
redeliveryPolicy.setBackOffMultiplier(2);
redeliveryPolicy.setUseExponentialBackOff(true);
redeliveryPolicy.setMaximumRedeliveries(2);
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
activeMQConnectionFactory.setUseRetroactiveConsumer(true);
activeMQConnectionFactory.setClientIDPrefix("ID");
PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);
pooledConnectionFactory.start();
Connection connection = pooledConnectionFactory.createConnection();
Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
Queue helloQueue = session.createQueue("Hello");
MessageConsumer consumer = session.createConsumer(helloQueue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
switch (atomicInteger.getAndIncrement()) {
case 0:
System.out.println("OK, first message received " + textMessage.getText());
message.acknowledge();
break;
case 1:
System.out.println("NOPE, second must be retried " + textMessage.getText());
throw new RuntimeException("I failed, aaaaah");
case 2:
System.out.println("OK, second message received " + textMessage.getText());
message.acknowledge();
}
} catch (JMSException e) {
e.printStackTrace(System.out);
}
}
});
connection.start();
{
// A client sends two messages...
Connection connection1 = pooledConnectionFactory.createConnection();
Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection1.start();
MessageProducer producer = session1.createProducer(helloQueue);
producer.send(session1.createTextMessage("Hello World 1"));
producer.send(session1.createTextMessage("Hello World 2"));
producer.close();
session1.close();
connection1.stop();
connection1.close();
}
JOptionPane.showInputDialog("I will wait, you watch the log...");
consumer.close();
session.close();
connection.stop();
connection.close();
pooledConnectionFactory.stop();
brokerService.stop();
assertEquals(3, atomicInteger.get());
}
Ответы
Ответ 1
Если вы хотите использовать SESSION_TRANSACTED в качестве режима подтверждения, вам необходимо настроить RedeliveryPolicy в Connection/ConnectionFactory. Эта страница на сайте ActiveMQ также содержит некоторую полезную информацию о том, что вам может понадобиться.
Поскольку вы не используете Spring, вы можете настроить RedeliveryPolicy с чем-то похожим на следующий код (взятый из одной из приведенных выше ссылок):
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setInitialRedeliveryDelay(500);
policy.setBackOffMultiplier(2);
policy.setUseExponentialBackOff(true);
policy.setMaximumRedeliveries(2);
Edit
Добавив в ответ ваш фрагмент кода, ниже показано, как это работает с транзакциями. Попробуйте этот код с помощью метода Session.rollback(), и вы увидите, что использование SESION_TRANSACTED и Session.commit/rollback работает как ожидалось:
@Test
public void test() throws Exception {
final AtomicInteger atomicInteger = new AtomicInteger(0);
BrokerService brokerService = new BrokerService();
String bindAddress = "vm://localhost";
brokerService.addConnector(bindAddress);
brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter());
brokerService.setUseJmx(false);
brokerService.start();
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress);
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(500);
redeliveryPolicy.setBackOffMultiplier(2);
redeliveryPolicy.setUseExponentialBackOff(true);
redeliveryPolicy.setMaximumRedeliveries(2);
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
activeMQConnectionFactory.setUseRetroactiveConsumer(true);
activeMQConnectionFactory.setClientIDPrefix("ID");
PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);
pooledConnectionFactory.start();
Connection connection = pooledConnectionFactory.createConnection();
final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue helloQueue = session.createQueue("Hello");
MessageConsumer consumer = session.createConsumer(helloQueue);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
switch (atomicInteger.getAndIncrement()) {
case 0:
System.out.println("OK, first message received " + textMessage.getText());
session.commit();
break;
case 1:
System.out.println("NOPE, second must be retried " + textMessage.getText());
session.rollback();
throw new RuntimeException("I failed, aaaaah");
case 2:
System.out.println("OK, second message received " + textMessage.getText());
session.commit();
}
} catch (JMSException e) {
e.printStackTrace(System.out);
}
}
});
connection.start();
{
// A client sends two messages...
Connection connection1 = pooledConnectionFactory.createConnection();
Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection1.start();
MessageProducer producer = session1.createProducer(helloQueue);
producer.send(session1.createTextMessage("Hello World 1"));
producer.send(session1.createTextMessage("Hello World 2"));
producer.close();
session1.close();
connection1.stop();
connection1.close();
}
JOptionPane.showInputDialog("I will wait, you watch the log...");
consumer.close();
session.close();
connection.stop();
connection.close();
pooledConnectionFactory.stop();
assertEquals(3, atomicInteger.get());
}
}
Ответ 2
Вам нужно установить режим подтверждения на Session.CLIENT_ACKNOWLEDGE, клиент подтверждает принятое сообщение, вызвав метод подтверждения сообщения.
QueueSession session = connection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
Затем после обработки сообщения необходимо вызвать метод Message.acknowledge(), чтобы удалить это сообщение.
Message message = ...;
// Processing message
message.acknowledge();
Ответ 3
Если ваш сеанс трансактен, тогда "confirmMode" игнорируется в любом случае. Итак, просто оставьте свой сеанс транзакцией и используйте session.rollback и session.commit для фиксации или отката транзакции.