Хорошая практика при использовании kafka с jpa
Я сейчас в проекте, где используются JPA и Kafka. Я пытаюсь найти набор хороших практик для объединения этих операций.
В существующем коде производитель используется в той же транзакции, что и jpa, однако из того, что я прочитал, кажется, что они не делят транзакцию.
@PostMapping
@Transactional
public XDto createX(@RequestBody XRequest request) {
Xdto dto = xService.create(request);
kafkaProducer.putToQueue(dto, Type.CREATE);
return dto;
}
где производитель кафки определяется следующим образом:
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Type> template;
public void putToQueue(Dto dto, Type eventType) {
template.send("event", new Event(dto, eventType));
}
}
Является ли это допустимым прецедентом для объединения jpa и kafka, правильно ли определены границы транзакций?
Ответы
Ответ 1
это не будет работать так, как предполагалось, когда транзакция завершится с ошибкой. взаимодействие kafka не является частью транзакции.
Возможно, вам захочется взглянуть на TransactionalEventListener. Вы можете написать сообщение kafka в событии AFTER_COMMIT. даже тогда публикация кафки может потерпеть неудачу.
Другой вариант - написать db с помощью jpa, как вы это делаете. Пусть дебезий читает обновленные данные из вашей базы данных и подталкивает их к кафке. Мероприятие будет в другом формате, но гораздо более богаче.
Ответ 2
Рассматривая свой вопрос, я предполагаю, что вы пытаетесь достичь CDC (Change Data Capture) вашей OLTP-системы, то есть регистрировать все изменения, которые происходят в транзакционной базе данных. Есть два способа приблизиться к этому.
- Код приложения выполняет двойную запись в транзакционную БД, а также в Kafka. Это противоречиво и препятствует производительности. Непоследовательно, потому что, когда вы делаете двойную запись в две независимые системы, данные закручиваются, когда одна из записей не выполняется, а передача данных в Kafka в потоке транзакций добавляет латентность, которую вы не хотите компрометировать.
- Извлечь изменения из фиксации БД (триггеры базы данных/уровня приложения или журнал транзакций) и отправить его в Kafka. Он очень последователен и не влияет на вашу транзакцию. Согласовано, поскольку журналы фиксации БД представляют собой отражения транзакций БД после успешных коммитов. Есть много решений, доступных которых использовать этот подход, как шины данных, Maxwell, debezium и т.д.
Если CDC - ваш прецедент, попробуйте использовать любое из уже доступных решений.
Ответ 3
Вы не должны помещать отправляющее сообщение в kafka в транзакции. Если вам нужна логика, когда, если ей не удается отправить событие в kafka, а затем верните транзакцию, в этом случае будет лучше использовать spring-retry
. Просто поместите код, связанный с отправляющим событием, в @Retryable
аннотированном методе @Retryable
, а также добавьте аннотированный метод @Recover
с логикой возврата изменений в БД, сделанных ранее.