Как проверить, запущен ли сервер Kafka?

Я хочу убедиться, что сервер kafka работает или нет, прежде чем запускать производственные и потребительские задания. Это в среде Windows, и вот мой код сервера kafka в eclipse...

Properties kafka = new Properties();
kafka.setProperty("broker.id", "1");
kafka.setProperty("port", "9092");
kafka.setProperty("log.dirs", "D://workspace//");
kafka.setProperty("zookeeper.connect", "localhost:2181");    
Option<String> option = Option.empty();
KafkaConfig config = new KafkaConfig(kafka);        
KafkaServer server = new KafkaServer(config, new CurrentTime(), option);
server.startup();

В этом случае if (server != null) недостаточно, потому что это всегда верно. Так что есть ли способ узнать, что мой сервер kafka работает и готов к продюсеру. Мне необходимо проверить это, потому что это приводит к потере некоторых пакетов начальных данных.

Спасибо.

Ответы

Ответ 1

Все брокеры Kafka должны быть назначены broker.id. При запуске брокер создаст эфемерный node в Zookeeper с контуром /broker/ids/$id. Поскольку node является эфемерным, он будет удален, как только брокер отключится, например. путем закрытия.

Вы можете просмотреть список эфемерных узлов брокера, например:

echo dump | nc localhost 2181 | grep brokers

Клиентский интерфейс ZooKeeper предоставляет несколько команд; dump перечисляет все сеансы и эфемерные узлы кластера.

Обратите внимание, что вышесказанное предполагает:

  • Вы используете ZooKeeper на порту по умолчанию (2181) на localhost и что localhost является лидером кластера
  • Конфигурация zookeeper.connect Kafka не указывает chroot env для вашего кластера Kafka, то есть просто host:port, а не host:port/path

Ответ 2

Пол отвечает очень хорошо, и на самом деле Кафка и Зк работают вместе с точки зрения брокера.

Я бы сказал, что еще один простой способ проверить, работает ли сервер Kafka, - создать простой KafkaConsumer, указывающий на кластер, и попробовать какое-то действие, например listTopics(). Если сервер kafka не запущен, вы получите TimeoutException, а затем вы можете использовать предложение try-catch.

  def validateKafkaConnection(kafkaParams : mutable.Map[String, Object]) : Unit = {
    val props = new Properties()
    props.put("bootstrap.servers", kafkaParams.get("bootstrap.servers").get.toString)
    props.put("group.id", kafkaParams.get("group.id").get.toString)
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    val simpleConsumer = new KafkaConsumer[String, String](props)
    simpleConsumer.listTopics()
  }

Ответ 3

Хорошим вариантом является использование AdminClient, как показано ниже, прежде чем начинать производить или потреблять сообщения

private static final int ADMIN_CLIENT_TIMEOUT_MS = 5000;           
 try (AdminClient client = AdminClient.create(properties)) {
            client.listTopics(new ListTopicsOptions().timeoutMs(ADMIN_CLIENT_TIMEOUT_MS)).listings().get();
        } catch (ExecutionException ex) {
            LOG.error("Kafka is not available, timed out after {} ms", ADMIN_CLIENT_TIMEOUT_MS);
            return;
        }

Ответ 4

Я использовал AdminClient api.

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("connections.max.idle.ms", 10000);
properties.put("request.timeout.ms", 5000);
try (AdminClient client = KafkaAdminClient.create(properties))
{
    ListTopicsResult topics = client.listTopics();
    Set<String> names = topics.names().get();
    if (names.isEmpty())
    {
        // case: if no topic found.
    }
    return true;
}
catch (InterruptedException | ExecutionException e)
{
    // Kafka is not available
}

Ответ 5

Я никогда не пользовался Kafka, но то, что я прочитал в инструкциях по созданию экземпляров, говорит:

Подождите несколько секунд, чтобы начать. Вы можете быть уверены, что сервер успешно запущен, когда вы видите следующие сообщения в ~/Kafka/kafka.log

[2015-07-29 06:02:41,736] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2015-07-29 06:02:41,776] INFO [Kafka Server 0], started (kafka.server.KafkaServer)

Ответ 6

Из java вы можете легко сделать вызов этой команды:

[[email protected] hsperfdata_kafka]$ /usr/bin/kafka status
Kafka is running with PID=17850.

Оберните его внутри try-catch. Блок исключений выполняется, если вы столкнулись с проблемами с kafka. Используйте "finally", чтобы очистить что-либо от вашей java-программы.

Я попытаюсь поделиться примером кода