Kafka получить список разделов для темы
Как я могу получить количество разделов для любой темы кафки из кода. Я исследовал множество ссылок, но никто не работает.
Упомянем несколько:
http://grokbase.com/t/kafka/users/148132gdzk/find-topic-partition-count-through-simpleclient-api
http://grokbase.com/t/kafka/users/151cv3htga/get-replication-and-partition-count-of-a-topic
http://qnalist.com/info/5809219/get-replication-and-partition-count-of-a-topic
которые похожи на аналогичные обсуждения.
Также есть похожие ссылки на SO, у которых нет рабочего решения.
Ответы
Ответ 1
Перейдите в каталог kafka/bin
.
Затем запустите это:
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic_name
Вы должны увидеть, что вам нужно в PartitionCount
.
Topic:topic_name PartitionCount:5 ReplicationFactor:1 Configs:
Topic: topic_name Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: topic_name Partition: 1 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: topic_name Partition: 2 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: topic_name Partition: 3 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: topic_name Partition: 4 Leader: 1001 Replicas: 1001 Isr: 1001
Ответ 2
В API-интерфейсе 0,82 Producer и 0.9 Consumer api вы можете использовать что-то вроде
Properties configProperties = new Properties();
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties);
producer.partitionsFor("test")
Ответ 3
Вот как я это делаю:
/**
* Retrieves list of all partitions IDs of the given {@code topic}.
*
* @param topic
* @param seedBrokers List of known brokers of a Kafka cluster
* @return list of partitions or empty list if none found
*/
public static List<Integer> getPartitionsForTopic(String topic, List<BrokerInfo> seedBrokers) {
for (BrokerInfo seed : seedBrokers) {
SimpleConsumer consumer = null;
try {
consumer = new SimpleConsumer(seed.getHost(), seed.getPort(), 20000, 128 * 1024, "partitionLookup");
List<String> topics = Collections.singletonList(topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List<Integer> partitions = new ArrayList<>();
// find our partition metadata
List<TopicMetadata> metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
partitions.add(part.partitionId());
}
}
return partitions; // leave on first successful broker (every broker has this info)
} catch (Exception e) {
// try all available brokers, so just report error and go to next one
LOG.error("Error communicating with broker [" + seed + "] to find list of partitions for [" + topic + "]. Reason: " + e);
} finally {
if (consumer != null)
consumer.close();
}
}
throw new RuntimeError("Could not get partitions");
}
Обратите внимание, что мне просто нужно было извлечь идентификаторы разделов, но вы можете дополнительно получить любые другие метаданные раздела, такие как leader
, isr
, replicas
,...
А BrokerInfo
- это простой POJO, имеющий поля host
и port
.
Ответ 4
Таким образом, для kafka 0.10 работает следующий подход, и он не использует никаких API-интерфейсов производителя или потребителя. Он использует некоторые классы из API scala в kafka, таких как ZkConnection и ZkUtils.
ZkConnection zkConnection = new ZkConnection(zkConnect);
ZkUtils zkUtils = new ZkUtils(zkClient,zkConnection,false);
System.out.println(JavaConversions.mapAsJavaMap(zkUtils.getPartitionAssignmentForTopics(
JavaConversions.asScalaBuffer(topicList))).get("bidlogs_kafka10").size());
Ответ 5
Ниже оболочки cmd можно напечатать количество разделов. Вы должны быть в каталоге kafka bin перед выполнением cmd:
sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic **TopicName** | awk '{print $2}' | uniq -c |awk 'NR==2{print "count of partitions=" $1}'
Обратите внимание, что вам нужно изменить название темы в соответствии с вашими потребностями. Вы также можете подтвердить это, используя условие:
sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic **TopicName** | awk '{print $2}' | uniq -c |awk 'NR==2{if ($1=="16") print "valid partitions"}'
Вышеупомянутая команда cmd выводит допустимые разделы, если счет равен 16. Вы можете изменить счет в зависимости от вашего требования.
Ответ 6
В Java-коде мы можем использовать AdminClient
чтобы получить сумму разделов по одной теме.
Properties props = new Properties();
props.put("bootstrap.servers", "host:9092");
AdminClient client = AdminClient.create(props);
DescribeTopicsResult result = client.describeTopics(Arrays.asList("TEST"));
Map<String, KafkaFuture<TopicDescription>> values = result.values();
KafkaFuture<TopicDescription> topicDescription = values.get("TEST");
int partitions = topicDescription.get().partitions().size();
System.out.println(partitions);
Ответ 7
У меня была такая же проблема, когда мне нужно было получить разделы для темы.
С помощью ответа здесь я смог получить информацию от Zookeeper.
Вот мой код в Scala (но может быть легко переведен на Java)
import org.apache.zookeeper.ZooKeeper
def extractPartitionNumberForTopic(topicName: String, zookeeperQurom: String): Int = {
val zk = new ZooKeeper(zookeeperQurom, 10000, null);
val zkNodeName = s"/brokers/topics/$topicName/partitions"
val numPartitions = zk.getChildren(zkNodeName, false).size
zk.close()
numPartitions
}
Используя этот подход, я смог получить доступ к информации о тематиках Kafka, а также другую информацию о брокерах Kafka...
Из Zookeeper вы можете проверить количество разделов для темы, просмотрев /brokers/topics/MY_TOPIC_NAME/partitions
Использование zookeeper-client.sh
для подключения к вашему zookeeper:
[zk: ZkServer:2181(CONNECTED) 5] ls /brokers/topics/MY_TOPIC_NAME/partitions
[0, 1, 2]
Это показывает нам, что существует 3 раздела для темы MY_TOPIC_NAME
Ответ 8
Количество разделов можно получить из zookeeper-shell
Syntax: ls /brokers/topics/<topic_name>/partitions
Ниже приведен пример:
[email protected]:/opt/kafka_2.11-2.0.0# bin/zookeeper-shell.sh zookeeper-01:2181
Connecting to zookeeper-01:2181
Welcome to ZooKeeper!
JLine support is disabled
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
ls /brokers/topics/test/partitions
[0, 1, 2, 3, 4]
Ответ 9
@Ответ Сунил-патиль перестает отвечать на его часть. Вы должны получить размер списка
производитель .partitionsFor( "test" ). size()
@vish4071 no point butting Sunil, вы не упоминали, что используете этот вопрос в ConsumerConnector.
Ответ 10
Вы можете изучить kafka.utils.ZkUtils
, который имеет множество методов, направленных на извлечение метаданных о кластере. Ответ здесь приятный, поэтому я просто добавляю для разнообразия:
import kafka.utils.ZkUtils
import org.I0Itec.zkclient.ZkClient
def getTopicPartitionCount(zookeeperQuorum: String, topic: String): Int = {
val client = new ZkClient(zookeeperQuorum)
val partitionCount = ZkUtils.getAllPartitions(client)
.count(topicPartitionPair => topicPartitionPair.topic == topic)
client.close
partitionCount
}
Ответ 11
cluster.availablePartitionsForTopic(topicName).size()
Ответ 12
//create the kafka producer
def getKafkaProducer: KafkaProducer[String, String] = {
val kafkaProps: Properties = new Properties()
kafkaProps.put("bootstrap.servers", "localhost:9092")
kafkaProps.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
kafkaProps.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
new KafkaProducer[String, String](kafkaProps)
}
val kafkaProducer = getKafkaProducer
val noOfPartition = kafkaProducer.partitionsFor("TopicName")
println(noOfPartition) //it will print the number of partiton for the given
//topic
Ответ 13
Используйте PartitionList от KafkaConsumer
//create consumer then loop through topics
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
List<PartitionInfo> partitions = consumer.partitionsFor(topic);
ArrayList<Integer> partitionList = new ArrayList<>();
System.out.println(partitions.get(0).partition());
for(int i = 0; i < partitions.size(); i++){
partitionList.add(partitions.get(i).partition());
}
Collections.sort(partitionList);
Должен работать как шарм. Дайте мне знать, если есть более простой способ получить доступ к списку разделов из темы.