Как мы можем создать тему в Kafka из IDE с помощью API
Как мы можем создать тему в Kafka из IDE с помощью API, потому что когда я это делаю:
bin/kafka-create-topic.sh --topic mytopic --replica 3 --zookeeper localhost:2181
Я получаю сообщение об ошибке:
bash: bin/kafka-create-topic.sh: No such file or directory
И я следил за настройкой разработчика, как есть.
Ответы
Ответ 1
В Kafka 0.8.1+ - последняя версия Kafka на сегодняшний день - вы можете программно создать новую тему через AdminCommand
. Функциональность CreateTopicCommand
(часть старого Kafka 0.8.0), упомянутая в одном из предыдущих ответов на этот вопрос, была перенесена на AdminCommand
.
Scala пример для Kafka 0.8.1:
import kafka.admin.AdminUtils
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient.ZkClient
// Create a ZooKeeper client
val sessionTimeoutMs = 10000
val connectionTimeoutMs = 10000
// Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then
// createTopic() will only seem to work (it will return without error). The topic will exist in
// only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
// topic.
val zkClient = new ZkClient("zookeeper1:2181", sessionTimeoutMs, connectionTimeoutMs,
ZKStringSerializer)
// Create a topic named "myTopic" with 8 partitions and a replication factor of 3
val topicName = "myTopic"
val numPartitions = 8
val replicationFactor = 3
val topicConfig = new Properties
AdminUtils.createTopic(zkClient, topicName, numPartitions, replicationFactor, topicConfig)
Создайте зависимости, используя пример sbt:
libraryDependencies ++= Seq(
"com.101tec" % "zkclient" % "0.4",
"org.apache.kafka" % "kafka_2.10" % "0.8.1.1"
exclude("javax.jms", "jms")
exclude("com.sun.jdmk", "jmxtools")
exclude("com.sun.jmx", "jmxri"),
...
)
EDIT: Добавлен пример Java для Kafka 0.9.0.0 (последняя версия с января 2016 года).
Зависимости Maven:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.0</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.7</version>
</dependency>
код:
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
public class KafkaJavaExample {
public static void main(String[] args) {
String zookeeperConnect = "zkserver1:2181,zkserver2:2181";
int sessionTimeoutMs = 10 * 1000;
int connectionTimeoutMs = 8 * 1000;
// Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then
// createTopic() will only seem to work (it will return without error). The topic will exist in
// only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
// topic.
ZkClient zkClient = new ZkClient(
zookeeperConnect,
sessionTimeoutMs,
connectionTimeoutMs,
ZKStringSerializer$.MODULE$);
// Security for Kafka was added in Kafka 0.9.0.0
boolean isSecureKafkaCluster = false;
ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);
String topic = "my-topic";
int partitions = 2;
int replication = 3;
Properties topicConfig = new Properties(); // add per-topic configurations settings here
AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig);
zkClient.close();
}
}
EDIT 2: Добавлен пример Java для Kafka 0.10.2.0 (последняя версия по состоянию на апрель 2017 года).
Зависимости Maven:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.2.0</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.9</version>
</dependency>
код:
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
public class KafkaJavaExample {
public static void main(String[] args) {
String zookeeperConnect = "zkserver1:2181,zkserver2:2181";
int sessionTimeoutMs = 10 * 1000;
int connectionTimeoutMs = 8 * 1000;
String topic = "my-topic";
int partitions = 2;
int replication = 3;
Properties topicConfig = new Properties(); // add per-topic configurations settings here
// Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then
// createTopic() will only seem to work (it will return without error). The topic will exist in
// only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
// topic.
ZkClient zkClient = new ZkClient(
zookeeperConnect,
sessionTimeoutMs,
connectionTimeoutMs,
ZKStringSerializer$.MODULE$);
// Security for Kafka was added in Kafka 0.9.0.0
boolean isSecureKafkaCluster = false;
ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);
AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig, RackAwareMode.Enforced$.MODULE$);
zkClient.close();
}
}
Ответ 2
Начиная с 0.11.0.0 все, что вам нужно:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
Этот артефакт теперь содержит AdminClient
(org.apache.kafka.clients.admin
).
AdminClient
может обрабатывать многие задачи администратора Kafka, включая создание тем:
Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
AdminClient admin = AdminClient.create(config);
Map<String, String> configs = new HashMap<>();
int partitions = 1;
int replication = 1;
admin.createTopics(asList(new NewTopic("topic", partitions, replication).configs(configs)));
Результатом этой команды является CreateTopicsResult
, которую вы можете использовать для получения Future
для всей операции или для каждого отдельного создания темы:
- чтобы получить будущее для всей операции, используйте
CreateTopicsResult#all()
.
- чтобы получить
Future
для всех тем в отдельности, используйте CreateTopicsResult#values()
.
Например:
CreateTopicsResult result = ...
KafkaFuture<Void> all = result.all();
или
CreateTopicsResult result = ...
for (Map.Entry<String, KafkaFuture<Void>> entry : result.values().entrySet()) {
try {
entry.getValue().get();
log.info("topic {} created", entry.getKey());
} catch (InterruptedException | ExecutionException e) {
if (Throwables.getRootCause(e) instanceof TopicExistsException) {
log.info("topic {} existed", entry.getKey());
}
}
}
KafkaFuture
- это "гибкое будущее, которое поддерживает цепочку вызовов и другие асинхронные шаблоны программирования" и "в конечном итоге станет тонким подстроем поверх Java 8 CompletebleFuture
".
Ответ 3
Для создания темы через java api и Kafka 0.8+ попробуйте следующее:
Первый импорт ниже оператора
import kafka.utils.ZKStringSerializer$;
Создайте объект для ZkClient следующим образом:
ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer$.MODULE$);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());
Ответ 4
Вы можете попробовать с помощью класса kafka.admin.CreateTopicCommand scala создать тему из кода Java... предоставить необходимые аргументы.
String [] arguments = new String[8];
arguments[0] = "--zookeeper";
arguments[1] = "10.***.***.***:2181";
arguments[2] = "--replica";
arguments[3] = "1";
arguments[4] = "--partition";
arguments[5] = "1";
arguments[6] = "--topic";
arguments[7] = "test-topic-Biks";
CreateTopicCommand.main(arguments);
NB:. Вы должны добавить зависимости maven для jopt-simple-4.5
и zkclient-0.1
Ответ 5
Если вы используете Kafka 0.10.0.0+, для создания темы из Java требуется передать параметр типа RackAwareMode. Это объект объекта Scala, и получение его экземпляра из Java является сложным (доказательство: Как я могу получить "get" a Scala case-объект из Java? для пример, но это неприменимо для нашего случая).
К счастью, rackAwareMode является необязательным параметром. Однако Java не поддерживает дополнительные параметры. Как мы это решаем? Вот решение:
AdminUtils.createTopic(zkUtils, topic, 1, 1,
AdminUtils.createTopic$default$5(),
AdminUtils.createTopic$default$6());
Используйте его с ответом miguno, и вам хорошо идти.
Ответ 6
Основываясь на последних API-версиях kafka-client и Kafka 1.1.1, рабочая версия кода выглядит следующим образом:
Импортируйте последние kafka-клиенты, используя sbt.
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
libraryDependencies += Seq("org.apache.kafka" % "kafka-clients" % "2.1.1",
"org.apache.kafka" %% "kafka" % "1.0.0")
Код для создания темы в Scala:
import java.util.Arrays
import java.util.Properties
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
class CreateKafkaTopic {
def create(): Unit = {
val config = new Properties()
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.30.1.5:9092")
val localKafkaAdmin = AdminClient.create(config)
val partitions = 3
val replication = 1.toShort
val topic = new NewTopic("integration-02", partitions, replication)
val topics = Arrays.asList(topic)
val topicStatus = localKafkaAdmin.createTopics(topics).values()
//topicStatus.values()
println(topicStatus.keySet())
}
}
Подтвердите новую тему, используя:
./kafka-topics.sh --zookeeper 192.30.1.5:2181 --list
Надеюсь, это кому-нибудь пригодится. Ссылка: http://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/admin/AdminClient.html
Ответ 7
От Kafka 0.8 Пример производителя в примере ниже будет создан раздел с именем page_visits
, а также начнется создание, если установлен атрибут auto.create.topics.enable
до true
(по умолчанию) в файле конфигурации Broker config
import java.util.*;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class TestProducer {
public static void main(String[] args) {
long events = Long.parseLong(args[0]);
Random rnd = new Random();
Properties props = new Properties();
props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "example.producer.SimplePartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
for (long nEvents = 0; nEvents < events; nEvents++) {
long runtime = new Date().getTime();
String ip = "192.168.2." + rnd.nextInt(255);
String msg = runtime + ",www.example.com," + ip;
KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
producer.send(data);
}
producer.close();
}
}
Ответ 8
Несколько способов, по которым ваш вызов не будет работать.
-
Если ваш кластер Kafka не имел достаточного количества узлов для поддержки значения репликации 3.
-
Если есть префикс пути chroot, вы должны добавить его после порта zookeeper
-
Вы работаете в каталоге установки Kafka при запуске (это наиболее вероятно)
Ответ 9
Существует AdminZkClient
который мы можем использовать для управления темами на сервере Kafka.
String zookeeperHost = "127.0.0.1:2181";
Boolean isSucre = false;
int sessionTimeoutMs = 200000;
int connectionTimeoutMs = 15000;
int maxInFlightRequests = 10;
Time time = Time.SYSTEM;
String metricGroup = "myGroup";
String metricType = "myType";
KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperHost,isSucre,sessionTimeoutMs,
connectionTimeoutMs,maxInFlightRequests,time,metricGroup,metricType);
AdminZkClient adminZkClient = new AdminZkClient(zkClient);
String topicName1 = "myTopic";
int partitions = 3;
int replication = 1;
Properties topicConfig = new Properties();
adminZkClient.createTopic(topicName1,partitions,replication,
topicConfig,RackAwareMode.Disabled$.MODULE$);
Вы можете ссылаться на эту ссылку для получения подробной информации https://www.analyticshut.com/streaming-services/kafka/create-and-list-kafka-topics-in-java/
Ответ 10
Из какой IDE вы пытаетесь?
Пожалуйста, укажите полный путь, ниже приведена команда из терминала, которая создаст тему
-
cd kafka/bin
-
./kafka-create-topic.sh --topic test --zookeeper localhost:2181
Ответ 11
создание темы в scala. Если вы работаете в режиме seudo в конфигурационном файле брокера
auto.create.topics.enable = true
он будет включать автоматическое создание темы на сервере. Если для этого параметра установлено значение true, попытки создания, потребления или извлечения метаданных для несуществующей темы автоматически создадут его с коэффициентом репликации по умолчанию и количеством разделов.
ниже кода фрагмента.
import scala.util.Random
import java.util.Properties
import kafka.producer.ProducerConfig
import kafka.producer.Producer
import kafka.producer.KeyedMessage
import java.util.Date
class SimpleProducer {
def sendmessages(){
val rnd = new Random();
val props = new Properties();
props.put("metadata.broker.list", "192.1.1.1:6667");
props.put("serializer.class", "kafka.serializer.StringEncoder");
//props.put("partitioner.class", "rtbi.dis.producers.SimplePartitioner")
val config = new ProducerConfig(props);
val producer = new Producer[String, String](config);
for (event<-1 to 5000) {
val runtime = new Date().getTime;
val ip = "192.1.1.1" + rnd.nextInt(255);
val msg = runtime + ",www.example.com," + ip;
val data = new KeyedMessage[String, String]("mytopic", ip, msg); //here mytopic is a topic
producer.send(data);
}
producer.close();
}
}
object SimpleProducer extends App{
val s= new SimpleProducer().sendmessages();
}
Ответ 12
Начиная с Kafka 0.10.1 ZKStringSerializer, упомянутый Майклом, является приватным (для Scala). Вы можете использовать методы factory createZkClient или createZkClientAndConnection в ZkUtils.
Scala пример для Kafka 0.10.1:
import kafka.utils.ZkUtils
val sessionTimeoutMs = 10000
val connectionTimeoutMs = 10000
val (zkClient, zkConnection) = ZkUtils.createZkClientAndConnection(
"localhost:2181", sessionTimeoutMs, connectionTimeoutMs)
Затем просто создайте тему, как предложил Майкл:
import kafka.admin.AdminUtils
val zkUtils = new ZkUtils(zkClient, zkConnection, false)
val numPartitions = 4
val replicationFactor = 1
val topicConfig = new Properties
val topic = "my-topic"
AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, topicConfig)