Потребительские извлечения метаданных Kafka по темам не удались
Я пытаюсь написать клиент Java для сторонних серверов Kafka и ZooKeeper. Я могу перечислить и описать темы, но когда я пытаюсь прочитать их, появляется ClosedChannelException
. Я воспроизвожу их здесь с клиентом командной строки.
$ bin/kafka-console-consumer.sh --zookeeper 255.255.255.255:2181 --topic eventbustopic
[2015-06-02 16:23:04,375] WARN Fetching topic metadata with correlation id 0 for topics [Set(eventbustopic)] from broker [id:1,host:SOME_HOST,port:9092] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
[2015-06-02 16:23:04,515] WARN Fetching topic metadata with correlation id 0 for topics [Set(eventbustopic)] from broker [id:0,host:SOME_HOST,port:9092] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
Выполняются альтернативные команды:
$ bin/kafka-topics.sh --describe --zookeeper 255.255.255.255:2181 --topic eventbustopic
Topic:eventbustopic PartitionCount:2 ReplicationFactor:1 Configs:
Topic: eventbustopic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: eventbustopic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
$ bin/kafka-topics.sh --list --zookeeper 255.255.255.255:2181 --topic eventbustopic
eventbustopic
(Ипы были отредактированы и заменены на 255.255.255.255)
Когда я отправляю это исключение, я вижу проблемы со стороны производителя - действительно, источник для ClientUtils.fetchTopicMetadata
предполагает, что это в основном используется производителями.
Одна из проблем, которые возникают у меня, это то, что это может быть продукт сетевого макета: пакеты искалечены Haproxy и отправляются через VPN.
Что именно здесь работает?
Ответы
Ответ 1
Брокер сообщает клиенту, какое имя хоста следует использовать для создания/потребления сообщений. По умолчанию Kafka использует имя хоста системы, в которой он работает. Если это имя хоста не может быть разрешено на стороне клиента, вы получите это исключение.
Вы можете попробовать установить advertised.host.name
в конфигурации Kafka на имя/адрес хоста, которые должны использовать клиенты.
Ответ 2
Вот мой способ решить эту проблему:
- запустите
bin/kafka-server-stop.sh
, чтобы остановить запуск сервера kafka.
- измените файл свойств
config/server.properties
, добавив строку:
listeners=PLAINTEXT://{ip.of.your.kafka.server}:9092
- перезапустить сервер kafka.
Так как без настройки lisener kafka будет использовать java.net.InetAddress.getCanonicalHostName()
для получения адреса, который слушает сервер сокета.
Ответ 3
У вас проблема с Zookeeper. 255.255.255.255:2181
не является допустимым адресом Zookeeper; это широковещательный адрес в вашей сети или маске подсети. Чтобы все было в порядке, найдите IP-адрес или имя хоста на компьютере, на котором запущен Zookeeper.
Ответ 4
Ran в эту ошибку на AWS. Проблема в том, что я был чрезмерно ограничительным с группой безопасности и установил порты 2181 и 9092 на "мой IP". Это означало, что экземпляр kafka не смог найти ZK в одном окне.
Решение - откройте его - немного.