Простой встроенный пример теста Kafka с загрузкой spring
Редактировать FYI: пример работы с gitHub
Я искал в интернете и не смог найти работающего и простого примера встроенного теста Кафки.
Моя настройка:
- Весенний ботинок
- Несколько @KafkaListener с разными темами в одном классе
- Встроенный Кафка для теста, который начинается нормально
- Тест с Kafkatemplate, который отправляет в тему, но методы @KafkaListener ничего не получают даже после большого времени сна
- Предупреждения или ошибки не отображаются, в журналах только информационный спам от Kafka
Пожалуйста, помогите мне. Есть в основном переконфигурированные или слишком сильные примеры. Я уверен, что это можно сделать просто. Спасибо, парни!
@Controller
public class KafkaController {
private static final Logger LOG = getLogger(KafkaController.class);
@KafkaListener(topics = "test.kafka.topic")
public void receiveDunningHead(final String payload) {
LOG.debug("Receiving event with payload [{}]", payload);
//I will do database stuff here which i could check in db for testing
}
}
private static String SENDER_TOPIC = "test.kafka.topic";
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC);
@Test
public void testSend() throws InterruptedException, ExecutionException {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 0, "message00")).get();
producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 1, "message01")).get();
producer.send(new ProducerRecord<>(SENDER_TOPIC, 1, 0, "message10")).get();
Thread.sleep(10000);
}
Ответы
Ответ 1
Встраиваемые тесты Kafka работают для меня с ниже configs,
Аннотации к тестовому классу
@EnableKafka
@SpringBootTest(classes = {KafkaController.class}) // Specify @KafkaListener class if its not the same class,or not loaded with test config
@EmbeddedKafka(partitions = 1, controlledShutdown = false,
brokerProperties = {"listeners=PLAINTEXT://localhost:3333", "port=3333"})
public class KafkaConsumerTest{
@Autowired
KafkaEmbedded kafkaEmbeded;
@Autowired
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
Перед аннотацией для метода настройки
@Before
public void setUp() throws Exception {
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer,
kafkaEmbeded.getPartitionsPerTopic());
}
}
Примечание. Я не использую @ClassRule
для создания встроенной Kafka, а не автоматической проводки
@Autowired embeddedKafka
@Test
public void testReceive() throws Exception {
kafkaTemplate.send(topic, data);
}
Надеюсь, это поможет!
Изменить: тестовый класс конфигурации, помеченный @TestConfiguration
@TestConfiguration
public class TestConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaEmbedded));
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
kafkaTemplate.setDefaultTopic(topic);
return kafkaTemplate;
}
Теперь метод @Test
будет autowire KafkaTemplate и использовать для отправки сообщения
kafkaTemplate.send(topic, data);
Обновлен блок кода ответа с указанной выше строкой
Ответ 2
Я решил проблему сейчас
@BeforeClass
public static void setUpBeforeClass() {
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
System.setProperty("spring.cloud.stream.kafka.binder.zkNodes", embeddedKafka.getZookeeperConnectionString());
}
во время отладки я увидел, что встроенный сервер kaka использует произвольный порт.
Я не смог найти конфигурацию для него, поэтому я настраиваю конфигурацию kafka так же, как сервер. Выглядит все еще немного некрасиво для меня.
Я хотел бы иметь только упомянутую строку @Mayur
@EmbeddedKafka(partitions = 1, controlledShutdown = false, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
но не могу найти нужную зависимость в интернете.
Ответ 3
поскольку принятый ответ не компилируется и не работает для меня. Я нахожу другое решение на основе https://blog.mimacom.com/testing-apache-kafka-with-spring-boot/, которым я хотел бы поделиться с вами.
Зависимость - это версия 'spring-kafka-test': '2.2.7.RELEASE'
@RunWith(SpringRunner.class)
@EmbeddedKafka(partitions = 1, topics = { "testTopic" })
@SpringBootTest
public class SimpleKafkaTest {
private static final String TEST_TOPIC = "testTopic";
@Autowired
EmbeddedKafkaBroker embeddedKafkaBroker;
@Test
public void testReceivingKafkaEvents() {
Consumer<Integer, String> consumer = configureConsumer();
Producer<Integer, String> producer = configureProducer();
producer.send(new ProducerRecord<>(TEST_TOPIC, 123, "my-test-value"));
ConsumerRecord<Integer, String> singleRecord = KafkaTestUtils.getSingleRecord(consumer, TEST_TOPIC);
assertThat(singleRecord).isNotNull();
assertThat(singleRecord.key()).isEqualTo(123);
assertThat(singleRecord.value()).isEqualTo("my-test-value");
consumer.close();
producer.close();
}
private Consumer<Integer, String> configureConsumer() {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", embeddedKafkaBroker);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Consumer<Integer, String> consumer = new DefaultKafkaConsumerFactory<Integer, String>(consumerProps)
.createConsumer();
consumer.subscribe(Collections.singleton(TEST_TOPIC));
return consumer;
}
private Producer<Integer, String> configureProducer() {
Map<String, Object> producerProps = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
return new DefaultKafkaProducerFactory<Integer, String>(producerProps).createProducer();
}
}