Как я могу изящно справиться с отключением Kafka?

Я подключаюсь к Kafka с помощью библиотеки 0.8.2.1 kafka-clients. Я могу успешно подключиться, когда Кафка встал, но я хочу обработать неудачу изящно, когда Кафка опущен. Вот моя конфигурация:

kafkaProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
kafkaProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
kafkaProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
kafkaProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
producer = new KafkaProducer(kafkaProperties);

Когда Kafka выключен, в моих журналах появляется следующая ошибка:

WARN: 07 Apr 2015 14:09:49.230 org.apache.kafka.common.network.Selector:276 - [] Error in I/O with localhost/127.0.0.1
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.7.0_75]
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) ~[na:1.7.0_75]
at org.apache.kafka.common.network.Selector.poll(Selector.java:238) ~[kafka-clients-0.8.2.1.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) [kafka-clients-0.8.2.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) [kafka-clients-0.8.2.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) [kafka-clients-0.8.2.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75]

Эта ошибка повторяется в бесконечном цикле и блокирует мое приложение Java. Я пробовал различные параметры конфигурации, связанные с тайм-аутами, повторами и подтверждениями, но я не смог предотвратить этот цикл.

Есть ли параметр конфигурации, который может предотвратить это? Нужно ли мне попробовать другую версию клиента? Как можно изящно отрегулировать Kafka?

Ответы

Ответ 1

Я понял, что эта комбинация настроек позволяет клиенту kafka быстро выйти из строя, не удерживая нить или спам журналов:

kafkaProperties.setProperty(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, "300");
kafkaProperties.setProperty(ProducerConfig.TIMEOUT_CONFIG, "300");
kafkaProperties.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "10000");
kafkaProperties.setProperty(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "10000");

Мне не нравится, что клиент kafka держит поток при попытке подключиться к серверу kafka, а не полностью асинхронно, но это по крайней мере функционально.

Ответ 2

В клиенте 0.9 также есть свойство max.block.ms, которое ограничивает время, в течение которого клиент может работать.