Как проверить, запущен ли сервер Kafka?
Я хочу убедиться, что сервер kafka работает или нет, прежде чем запускать производственные и потребительские задания. Это в среде Windows, и вот мой код сервера kafka в eclipse...
Properties kafka = new Properties();
kafka.setProperty("broker.id", "1");
kafka.setProperty("port", "9092");
kafka.setProperty("log.dirs", "D://workspace//");
kafka.setProperty("zookeeper.connect", "localhost:2181");
Option<String> option = Option.empty();
KafkaConfig config = new KafkaConfig(kafka);
KafkaServer server = new KafkaServer(config, new CurrentTime(), option);
server.startup();
В этом случае if (server != null)
недостаточно, потому что это всегда верно. Так что есть ли способ узнать, что мой сервер kafka работает и готов к продюсеру. Мне необходимо проверить это, потому что это приводит к потере некоторых пакетов начальных данных.
Спасибо.
Ответы
Ответ 1
Все брокеры Kafka должны быть назначены broker.id
. При запуске брокер создаст эфемерный node в Zookeeper с контуром /broker/ids/$id
. Поскольку node является эфемерным, он будет удален, как только брокер отключится, например. путем закрытия.
Вы можете просмотреть список эфемерных узлов брокера, например:
echo dump | nc localhost 2181 | grep brokers
Клиентский интерфейс ZooKeeper предоставляет несколько команд; dump
перечисляет все сеансы и эфемерные узлы кластера.
Обратите внимание, что вышесказанное предполагает:
- Вы используете ZooKeeper на порту по умолчанию (
2181
) на localhost
и что localhost
является лидером кластера
- Конфигурация
zookeeper.connect
Kafka не указывает chroot env для вашего кластера Kafka, то есть просто host:port
, а не host:port/path
Ответ 2
Пол отвечает очень хорошо, и на самом деле Кафка и Зк работают вместе с точки зрения брокера.
Я бы сказал, что еще один простой способ проверить, работает ли сервер Kafka, - создать простой KafkaConsumer, указывающий на кластер, и попробовать какое-то действие, например listTopics(). Если сервер kafka не запущен, вы получите TimeoutException, а затем вы можете использовать предложение try-catch
.
def validateKafkaConnection(kafkaParams : mutable.Map[String, Object]) : Unit = {
val props = new Properties()
props.put("bootstrap.servers", kafkaParams.get("bootstrap.servers").get.toString)
props.put("group.id", kafkaParams.get("group.id").get.toString)
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val simpleConsumer = new KafkaConsumer[String, String](props)
simpleConsumer.listTopics()
}
Ответ 3
Хорошим вариантом является использование AdminClient, как показано ниже, прежде чем начинать производить или потреблять сообщения
private static final int ADMIN_CLIENT_TIMEOUT_MS = 5000;
try (AdminClient client = AdminClient.create(properties)) {
client.listTopics(new ListTopicsOptions().timeoutMs(ADMIN_CLIENT_TIMEOUT_MS)).listings().get();
} catch (ExecutionException ex) {
LOG.error("Kafka is not available, timed out after {} ms", ADMIN_CLIENT_TIMEOUT_MS);
return;
}
Ответ 4
Я использовал AdminClient api.
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("connections.max.idle.ms", 10000);
properties.put("request.timeout.ms", 5000);
try (AdminClient client = KafkaAdminClient.create(properties))
{
ListTopicsResult topics = client.listTopics();
Set<String> names = topics.names().get();
if (names.isEmpty())
{
// case: if no topic found.
}
return true;
}
catch (InterruptedException | ExecutionException e)
{
// Kafka is not available
}
Ответ 5
Я никогда не пользовался Kafka, но то, что я прочитал в инструкциях по созданию экземпляров, говорит:
Подождите несколько секунд, чтобы начать. Вы можете быть уверены, что сервер успешно запущен, когда вы видите следующие сообщения в ~/Kafka/kafka.log
[2015-07-29 06:02:41,736] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2015-07-29 06:02:41,776] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
Ответ 6
Из java вы можете легко сделать вызов этой команды:
[[email protected] hsperfdata_kafka]$ /usr/bin/kafka status
Kafka is running with PID=17850.
Оберните его внутри try-catch. Блок исключений выполняется, если вы столкнулись с проблемами с kafka.
Используйте "finally", чтобы очистить что-либо от вашей java-программы.
Я попытаюсь поделиться примером кода