Spring/RabbitMQ: управление транзакциями
Чтобы упростить мою проблему, у меня есть
App1 с помощью метода @Transactionnal createUser():
- Вставить нового пользователя в базу данных
- Добавить асинхронное сообщение в RabbitMQ, чтобы пользователь получал почту с уведомлением
- (возможно, дополнительный код, но не много)
App2 с сообщением о контенте RabbitMQ
- Сообщает сообщения в очереди рассылки в режиме реального времени
- Чтение почтовых данных в базе данных
- Отправить почту
Проблема заключается в том, что иногда App2 пытается использовать сообщение RabbitMQ до того, как транзакция даже совершена в App1. Это означает, что App2 не может читать почтовые данные в базе данных, потому что пользователь еще не создан.
Некоторые решения могут быть:
- Используйте уровень изоляции READ_UNCOMMITED в App2
- Добавьте некоторую задержку в доставке сообщений RabbitMQ (или какой-либо RetryTemplate на consummer)
- Измените способ отправки писем...
Я видел, что в Spring есть RabbitTransactionManager, но я не могу понять, как он должен работать. Внутренние элементы обработки транзакций всегда кажутся немного трудными для понимания, и документация также не помогает.
Есть ли способ сделать что-то вроде этого?
- Добавить сообщение в очередь RabbitMQ в методе @Transactionnal
- Когда транзакция завершается, сообщение отправляется в очередь, а изменения заносятся в базу данных
- Чтобы сообщение не могло быть использовано до завершения транзакции db
Как? И что ожидать, например, если я отправляю синхронные сообщения RabbitMQ вместо асинхронных сообщений? Будет ли он блокировать поток, ожидающий ответа или что-то еще?
Потому что мы отправляем сообщения синхронизации и асинхронного сообщения для разных случаев использования.
Ответы
Ответ 1
Я не очень хорошо знаком с @Transactionnal и spring, но в AMQP стандартная очередь сообщений не является транзакционной операцией, поэтому вам нужно хранить данные в db (если соединение db транзакционно - транзакция фиксации) и только после этого отправить сообщение брокеру.
Правильный рабочий процесс выглядит для меня как App1: createUser -> notifyUser; App2: listenForNotifications
Ответ 2
Я знаю, что это поздно, но у меня была такая же проблема из-за моего ограниченного понимания @Transactional в то время. Так что это больше для тех, кого вы случайно натыкаетесь на это.
При использовании @Transactional для сохранения данных в базу данных сохранение в базе данных фактически не происходит до тех пор, пока метод не вернется, а не при вызове сохранения.
Итак, если у вас есть метод типа
@Transactional(readOnly=false)
public void save(Object object) { //Object should be one of your entities
entityManager.persist(object); //or however you have it set up
rabbitTemplate.convertAndSend(message); //again - however yours is
}
даже если вы вызываете упор на объекте до того, как вы поместите сообщение в очередь, то оно не произойдет до тех пор, пока метод не вернется, что приведет к тому, что сообщение будет помещено в очередь до того, как метод вернется и до данных фактически находится в базе данных.
Можно вложить методы @Transactional (не прямолинейно), чтобы отправить сообщение в очередь после возврата метода save()
. Однако вы не можете поместить сообщение в очередь и ожидать, что он не будет потреблен. Как только его там нет. Поэтому задержите его в очереди, если вам нужно.
Если вы хотите получить ответ из очереди синхронно. В моем примере с функцией - вы можете это сделать, однако это приведет к тому, что для фактического сохранения данных потребуется больше времени, так как он будет ожидать ответа от рабочего, прежде чем метод сможет вернуться и фактически сохранить данные. (также помните, что получение ответа от сообщения в очереди имеет тайм-аут).
Поэтому я советую не помещать эти 2 операции в один и тот же @Transactional