Проблема ребалансировки при чтении сообщений в Кафке

Я пытаюсь читать сообщения по теме Кафки, но я не могу ее прочитать. Процесс убивается через некоторое время, не читая никаких сообщений.

Вот ошибка перебалансировки, которую я получаю:

[2014-03-21 10:10:53,215] ERROR Error processing message, stopping consumer:  (kafka.consumer.ConsoleConsumer$)
kafka.common.ConsumerRebalanceFailedException: topic-1395414642817-47bb4df2 can't rebalance after 4 retries
    at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:428)
    at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:718)
    at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:752)
    at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:142)
    at kafka.consumer.ConsoleConsumer$.main(ConsoleConsumer.scala:196)
    at kafka.consumer.ConsoleConsumer.main(ConsoleConsumer.scala)
Consumed 0 messages

Я попытался запустить ConsumerOffsetChecker, и это ошибка, которую я получаю. Я понятия не имею, как решить это. Кто-нибудь, любая идея?

./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:9092 --topic mytopic --group  topic_group
Group           Topic                          Pid Offset          logSize         Lag             Owner
Exception in thread "main" org.I0Itec.zkclient.exception.ZkNoNodeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/
        at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
        at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
        at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
        at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
        at kafka.utils.ZkUtils$.readData(ZkUtils.scala:459)
        at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:59)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:89)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:89)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:89)
        at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
        at scala.collection.immutable.List.foreach(List.scala:45)
        at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:88)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:153)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:153)
        at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
        at scala.collection.immutable.List.foreach(List.scala:45)
        at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:152)
        at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
Caused by: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:42)
        at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:927)
        at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:956)
        at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
        at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
        at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
        at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
        ... 16 more

Ответы

Ответ 1

В последнее время у меня есть аналогичные проблемы. Вы можете попытаться увеличить конфигурации пользователя rebalance.backoff.ms и zookeeper.session.timeout.ms примерно до 5-10 секунд.

Первый параметр говорит кафке подождать еще до повторной ребалансировки. Второй говорит кафке, чтобы быть более терпеливым, пытаясь подключиться к zookeeper.

Другие параметры конфигурации можно найти в официальной документации.

Ответ 2

Это, вероятно, означает, что брокеры не правильно создали эти узлы, когда подключились к Zookeeper. Путь/потребитель должен существовать, когда вы пытаетесь потреблять.

Вот несколько путей для отладки:

Создавали ли вы какие-либо темы?

Если да:

  • Сколько разделов есть в теме?
  • Вы проверяли, правильно ли заполнены метаданные в zookeeper?
  • Можем ли мы увидеть вашу потребительскую конфигурацию?

Если нет:

  • Затем вам нужно создать тему, используя script $KAFKA_DIR/bin/kafka-create-topic.sh. Просмотрите сведения об использовании script.
  • После создания темы вам необходимо создать пользователя с идентификатором группы, который ранее не использовался, иначе вы не начнете новый.

Ответ 3

В файле kafka.tools.ConsumerOffsetChecker есть ошибка. Если конкретная запись Zookeeper node, содержащая информацию о смещенных смещениях, не выходит из строя, инструмент завершает выполнение функции.

Например, предположим, что у вас есть потребительская группа "mygroup" и тема "topictest". Затем смещение для раздела 2 поддерживается в Znode: /Потребители/MyGroup/Смещение/topictest/2.

Если в разделе Znode нет записи для раздела 2 темы topictest, тогда утилита offsetchecker для пользователя выйдет, проверяя смещение для раздела 2. В основном, он не работает, проверяя первый раздел "n", для которого Zooode/consumer/mygroup/offsets/topictest/n отсутствует в Zookeeper.

Ответ 4

Вероятно, ваши брокеры находятся в автономном режиме, и они не могут подключиться к Zookeeper, попробовали ли вы запустить консоль-потребитель script, доступный в пути $KAFKA_ROOT_DIR/bin, чтобы проверить, можете ли вы использовать конкретную тему.