Динамически регистрировать прослушиватель транзакций с помощью spring?
У меня есть приложение springframework, в котором я хотел бы добавить прослушиватель транзакций к транзакции, которая в настоящее время выполняется. Мотивация заключается в том, чтобы инициировать действие post commit, которое уведомляет о нижестоящих системах. Я использую @Transactional для переноса транзакции вокруг некоторого метода службы - именно там я хочу создать/зарегистрировать прослушиватель транзакций. Я хочу сделать что-то вроде "следующего".
public class MyService {
@Transaction
public void doIt() {
modifyObjects();
// something like this
getTransactionManager().registerPostCommitAction(new
TransactionSynchronizationAdapter() {
public void afterCommit() {
notifyDownstream();
}
});
}
}
Spring имеет интерфейс TransactionSynchronization и класс адаптера, который кажется именно тем, что я хочу; однако сразу не ясно, как динамически регистрироваться либо с текущей транзакцией, либо с менеджером транзакций. Я бы предпочел не подклассифицировать JtaTransactionManager, если я могу его избежать.
Q: Кто-нибудь сделал это раньше.
Q: Каков самый простой способ регистрации моего адаптера?
Ответы
Ответ 1
вы могли бы использовать аспект, чтобы соответствовать аспекту транзакционных методов в вашем сервисе, чтобы выполнить это:
@Aspect
public class AfterReturningExample {
@AfterReturning("execution(* com.mypackage.MyService.*(..))")
public void afterReturning() {
// ...
}
}
Ответ 2
На самом деле это было не так сложно, как я думал; spring имеет статический вспомогательный класс, который помещает "правильный" материал в контекст потока.
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
s_logger.info("TRANSACTION COMPLETE!!!");
}
}
);
Ответ 3
Вот более полное решение, которое я сделал для аналогичной проблемы, что, если мои сообщения отправляются после совершения транзакций (я мог бы использовать RabbitMQ TX, но они довольно медленные).
public class MessageBusUtils {
public static Optional<MessageBusResourceHolder> getTransactionalResourceHolder(TxMessageBus messageBus) {
if ( ! TransactionSynchronizationManager.isActualTransactionActive()) {
return Optional.absent();
}
MessageBusResourceHolder o = (MessageBusResourceHolder) TransactionSynchronizationManager.getResource(messageBus);
if (o != null) return Optional.of(o);
o = new MessageBusResourceHolder();
TransactionSynchronizationManager.bindResource(messageBus, o);
o.setSynchronizedWithTransaction(true);
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronizationManager.registerSynchronization(new MessageBusResourceSynchronization(o, messageBus));
}
return Optional.of(o);
}
private static class MessageBusResourceSynchronization extends ResourceHolderSynchronization<MessageBusResourceHolder, TxMessageBus> {
private final TxMessageBus messageBus;
private final MessageBusResourceHolder holder;
public MessageBusResourceSynchronization(MessageBusResourceHolder resourceHolder, TxMessageBus resourceKey) {
super(resourceHolder, resourceKey);
this.messageBus = resourceKey;
this.holder = resourceHolder;
}
@Override
protected void cleanupResource(MessageBusResourceHolder resourceHolder, TxMessageBus resourceKey,
boolean committed) {
resourceHolder.getPendingMessages().clear();
}
@Override
public void afterCompletion(int status) {
if (status == TransactionSynchronization.STATUS_COMMITTED) {
for (Object o : holder.getPendingMessages()) {
messageBus.post(o, false);
}
}
else {
holder.getPendingMessages().clear();
}
super.afterCompletion(status);
}
}
}
public class MessageBusResourceHolder extends ResourceHolderSupport {
private List<Object> pendingMessages = Lists.newArrayList();
public void addMessage(Object message) {
pendingMessages.add(message);
}
protected List<Object> getPendingMessages() {
return pendingMessages;
}
}
Теперь в своем классе, где вы действительно отправляете сообщение, вы будете делать
@Override
public void postAfterCommit(Object o) {
Optional<MessageBusResourceHolder> holder = MessageBusTxUtils.getTransactionalResourceHolder(this);
if (holder.isPresent()) {
holder.get().addMessage(o);
}
else {
post(o, false);
}
}
Извините за длинные образцы кодировки, но, надеюсь, кто-то покажет кому-то, как что-то сделать после фиксации.
Ответ 4
Имеет ли смысл переопределять диспетчер транзакций в методах фиксации и отката, вызывая super.commit()
в начале.