Ошибка Zookeeper и Kafka KeeperErrorCode = NodeExists

Я написал a kafka consumer и producer, который отлично работает до сегодняшнего дня. Сегодня утром, когда я начал zooekeeper и kafka, моему потребителю не удалось прочитать сообщения, а в Zookeeper log я прочитал эту ошибку

INFO Got user-level KeeperException when processing sessionid:0x151c41e62e10000 type:create cxid:0x2a zxid:0x1e txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor)

Не могли бы вы мне помочь? Что могло измениться всего за несколько дней? Я не понимаю. Большое вам спасибо.

Ответы

Ответ 1

У меня была эта ошибка в моей Kafka 2.11, работающей в Windows 7. Я думаю, что это исключение не является проблемой, поскольку это только информационный уровень. Просто убедитесь, что брокер все еще работает. Даже с этой ошибкой я все еще мог:

  • Создайте и перечислите тему kafka-topics.bat.
  • Используйте тему kafka-console-consumer.bat.
  • Программно отправить сообщение producer.send(new ProducerRecord<String, String>("topic", "hello")).

Ответ 2

В моем случае это влияет на функциональность, так как я не могу потреблять сообщения. Смотрите код ниже

Vertx instance = VertxConfig.getInstance();

    Properties consumerConfig = new Properties();
    consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

//consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "самый ранний"); //consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

    Properties producerConfig = new Properties();
    producerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    producerConfig.put("acks", "1");

    String topic = "dstv-queue-3";
    consumer = KafkaConsumer.create(instance, consumerConfig);
    producer = KafkaProducer.create(instance, producerConfig, String.class, String.class);
    consumer.subscribe(topic);

    instance.setPeriodic(2000, worker -> {
        KafkaProducerRecord<String, String> record = KafkaProducerRecord.create(topic, "message");
        producer.write(record, writeHandler -> {
            RecordMetadata metadata = writeHandler.result();

            //if meta data returned..
            if (metadata != null) {
                long offset = metadata.getOffset();
                int partition = metadata.getPartition();
                System.out.println("completed write: " + (writeHandler.succeeded() ? "successful" : "failed") + " offset:" + offset + " partition: " + partition);
            }
        });
    });

    AtomicLong counter = new AtomicLong();
    consumer.handler(readHandler -> System.out.println(counter.getAndAdd(1) + ". " + readHandler.value() + " was received"));