Очистить Кафка Тема
Я поместил сообщение, которое было слишком большим, в тему сообщений kafka на моей локальной машине, теперь я получаю сообщение об ошибке:
kafka.common.InvalidMessageSizeException: invalid message size
Увеличение fetch.size
не является идеальным здесь, потому что я не хочу принимать такие большие сообщения. Есть ли способ очистить тему в кафке?
Ответы
Ответ 1
Временно обновите время хранения темы до одной секунды:
kafka-topics.sh --zookeeper <zkhost>:2181 --alter --topic <topic name> --config retention.ms=1000
А в более новых выпусках Kafka вы также можете сделать это с помощью kafka-configs --entity-type topics
kafka-configs.sh --zookeeper <zkhost>:2181 --entity-type topics --alter --entity-name <topic name> --add-config retention.ms=1000
затем подождите, пока очистка не вступит в силу (около минуты). После очистки восстановите предыдущее значение retention.ms
.
Ответ 2
Чтобы очистить очередь, вы можете удалить тему:
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
затем заново создайте его:
bin/kafka-topics.sh --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 --topic test
Ответ 3
Вот шаги, которые я выполняю, чтобы удалить тему с именем MyTopic
:
- Опишите тему, и не берите идентификаторы брокера
- Остановите демон Apache Kafka для каждого из перечисленных идентификаторов брокера.
- Подключитесь к каждому брокеру и удалите папку данных темы, например,
rm -rf/tmp/kafka-logs/MyTopic-0
. Повторите для других разделов и всех реплик - Удалите метаданные темы:
zkCli.sh
затем rmr/brokers/MyTopic
- Запустите демон Apache Kafka для каждой остановленной машины
Если вы пропустите шаг 3, то Apache Kafka продолжит сообщать о теме как о существующей (например, если вы запустите kafka-list-topic.sh
).
Протестировано с Apache Kafka 0.8.0.
Ответ 4
Хотя принятый ответ верен, этот метод устарел. Конфигурация темы теперь должна быть выполнена с помощью kafka-configs
.
kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic
Конфигурации, установленные с помощью этого метода, могут отображаться с помощью команды
kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic
Ответ 5
Протестировано в Kafka 0.8.2, для примера быстрого запуска:
Сначала добавьте одну строку в файл server.properties в папке config:
delete.topic.enable=true
тогда вы можете запустить эту команду:
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
Ответ 6
Из кафка 1.1
Очистить тему
bin/kafka-configs.sh --zookeeper localhost: 2181 --alter - темы типа объекта → - имя объекта tp_binance_kline --add-config retention.ms = 100
подождите 1 минуту, чтобы быть уверенным, что кафка очистит тему, удалите конфигурацию, а затем перейдите к значению по умолчанию
bin/kafka-configs.sh --zookeeper localhost: 2181 --alter - темы типа объекта → -entity-name tp_binance_kline --delete-config retention.ms
Ответ 7
ОБНОВЛЕНИЕ: Этот ответ актуален для Кафки 0.6. Для Кафки 0.8 и более поздних версий смотрите ответ @Patrick.
Да, остановите kafka и вручную удалите все файлы из соответствующего подкаталога (это легко найти в каталоге данных kafka). После перезагрузки кафки тема будет пустой.
Ответ 8
Иногда, если у вас насыщенный кластер (слишком много разделов, или используются зашифрованные данные раздела, или используется SSL, или контроллер находится на поврежденном узле, или соединение ненадежно, очистка указанной темы займет много времени),
Я следую этим шагам, особенно если вы используете Avro.
1: Запуск с инструментами Кафки:
bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>
2: Запустить на узле реестра схемы:
kafka-avro-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning
3: Установите сохранение темы обратно к первоначальной настройке, когда тема пуста.
bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>
Надеюсь, это кому-то поможет, так как это нелегко рекламировать.
Ответ 9
Самый простой подход - установить дату, когда отдельные файлы журнала будут старше, чем период хранения. Затем брокер должен очистить их и удалить их за вас в течение нескольких секунд. Это дает несколько преимуществ:
- Не нужно сбивать брокеров, это работает во время работы.
- Избегает возможности недопустимых исключений смещения (подробнее об этом ниже).
В моем опыте с Kafka 0.7.x удаление файлов журнала и перезапуск брокера может привести к неверным исключениям смещения для определенных потребителей. Это произойдет из-за того, что брокер перезапустит смещения в нуле (в отсутствие каких-либо существующих файлов журналов), и потребитель, который ранее потреблял эту тему, повторно подключился бы, чтобы запросить конкретное смещение [как только действительное]. Если это смещение выходит за пределы новых журналов тем, тогда вред и потребитель не возобновляются ни в начале, ни в конце. Но если смещение попадает в рамки новых журналов тем, брокер пытается получить набор сообщений, но не получается, потому что смещение не соответствует фактическому сообщению.
Это можно смягчить, также очистив потребительские смещения в zookeeper для этой темы. Но если вам не нужна девственная тема и вы просто хотите удалить существующее содержимое, просто "прикосновение" к нескольким журналам тем намного проще и надежнее, чем останавливать брокеров, удалять журналы тем и очищать определенные узлы zookeeper.
Ответ 10
kafka не имеет прямого метода для темы очистки/очистки (очереди), но может сделать это, удалив эту тему и воссоздав ее.
сначала убедитесь, что файл sever.properties имеет, и если не добавить delete.topic.enable=true
then, Удалить тему
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic
затем создайте его снова.
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2
Ответ 11
Совет Томаса замечательный, но, к сожалению, zkCli
в старых версиях Zookeeper (например, 3.3.6), похоже, не поддерживает rmr
. Например, сравните реализацию командной строки в современный Zookeeper с версия 3.3.
Если вы столкнулись со старой версией Zookeeper, одним из решений является использование клиентской библиотеки, такой как zc.zk для Python. Для людей, не знакомых с Python, вам необходимо установить их с помощью pip или easy_install. Затем запустите оболочку Python (python
), и вы можете сделать:
import zc.zk
zk = zc.zk.ZooKeeper('localhost:2181')
zk.delete_recursive('brokers/MyTopic')
или даже
zk.delete_recursive('brokers')
если вы хотите удалить все темы из Kafka.
Ответ 12
Чтобы очистить все сообщения из определенной темы с помощью вашей группы приложений (имя_группы должно совпадать с именем группы приложений kafka).
./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group
Ответ 13
Не удалось добавить в качестве комментария из-за размера:
Не уверен, что это правда, помимо обновления retention.ms и retention.bytes, но я заметил, что политика очистки темы должна быть "удалена" (по умолчанию), если "compact", она будет удерживаться в сообщениях дольше, т.е. Если он "компактный", вам нужно также указать delete.retention.ms.
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1
Также необходимо было отслеживать самые ранние/последние смещения, чтобы они были одинаковыми, чтобы подтвердить, что это успешно произошло, также можете проверить du -h/tmp/kafka-logs/test-topic-3-100 - *
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}'
26599762
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}'
26599762
Другая проблема заключается в том, что вам нужно получить текущую конфигурацию сначала, чтобы вы не смогли вернуться после удаления.
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
Ответ 14
Другой, довольно ручной подход для очистки темы:
в брокерах:
- стоп кафка брокер
sudo service kafka stop
- удалить все файлы журналов разделов (должно быть сделано на всех брокерах)
sudo rm -R/kafka-storage/kafka-logs/<some_topic_name>-*
в зоопарке:
- запустить интерфейс командной строки zookeeper
sudo/usr/lib/zookeeper/bin/zkCli.sh
- используйте zkCli для удаления метаданных темы
rmr/brokers/topic/<some_topic_name>
снова в брокерах:
- перезапустить брокерскую службу
sudo service kafka start
Ответ 15
./kafka-topics.sh --describe --zookeeper zkHost:2181 --topic myTopic
Это должно дать retention.ms
настроено. Затем вы можете использовать указанную выше команду alter, чтобы изменить значение на 1 секунду (и позже вернуться к значению по умолчанию).
Topic:myTopic PartitionCount:6 ReplicationFactor:1 Configs:retention.ms=86400000
Ответ 16
Из Java, используя новый AdminZkClient
вместо устаревших AdminUtils
:
public void reset() {
try (KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200_000,
5000, 10, Time.SYSTEM, "metricGroup", "metricType")) {
for (Map.Entry<String, List<PartitionInfo>> entry : listTopics().entrySet()) {
deleteTopic(entry.getKey(), zkClient);
}
}
}
private void deleteTopic(String topic, KafkaZkClient zkClient) {
// skip Kafka internal topic
if (topic.startsWith("__")) {
return;
}
System.out.println("Resetting Topic: " + topic);
AdminZkClient adminZkClient = new AdminZkClient(zkClient);
adminZkClient.deleteTopic(topic);
// deletions are not instantaneous
boolean success = false;
int maxMs = 5_000;
while (maxMs > 0 && !success) {
try {
maxMs -= 100;
adminZkClient.createTopic(topic, 1, 1, new Properties(), null);
success = true;
} catch (TopicExistsException ignored) {
}
}
if (!success) {
Assert.fail("failed to create " + topic);
}
}
private Map<String, List<PartitionInfo>> listTopics() {
Properties props = new Properties();
props.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
props.put("group.id", "test-container-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
Map<String, List<PartitionInfo>> topics = consumer.listTopics();
consumer.close();
return topics;
}
Ответ 17
После ответа @steven appleyard я выполнил следующие команды на Kafka 2.2.0, и они помогли мне.
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --describe
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --alter --add-config retention.ms=1000
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --alter --delete-config retention.ms