Kafka 0.11 как reset смещения
Я пытаюсь использовать reset потребительское смещение с последними инструментами CLI для Kafka.
kafka-consumer-groups.bat --bootstrap-server kafka-host:9092 --group my-group --reset-offsets --to-earliest --all-topics
В результате я вижу этот вывод:
TOPIC PARTITION NEW-OFFSET
FirstTopic 0 0
SecondTopic 0 0
Но снова запустите команду:
kafka-consumer-groups.bat --bootstrap-server kafka-host:9092 --group my-group --describe
выводит результат:
Consumer group 'my-group' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
FirstTopic 0 1230 1230 0
SecondTopic 0 1022 1022 0
Я пробовал другие параметры, такие как сброс в явное смещение или непосредственное указание темы, но результат такой же. Вывод подсказывает, что операция выполняется при проверке смещений с помощью команды или отладки, показывает, что смещение не изменилось.
Кто-нибудь преуспеет в том, что сброс потребительского смещения в брокерах без zookeeper.
Ответы
Ответ 1
По умолчанию --reset-offsets
просто печатает результат операции. Чтобы выполнить операцию, вам нужно добавить --execute
к вашей команде:
kafka-consumer-groups.bat --bootstrap-server kafka-host:9092 --group
my-group --reset-offsets --to-earliest --all-topics --execute
Ответ 2
Хотя принятый ответ отлично отвечает на вопрос OP, есть больше параметров, доступных для сброса смещений. Поэтому добавление этого ответа расширяет принятый ответ.
Сброс смещения всех тем до самого раннего в группе потребителей
kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
<group_name> --reset-offsets --to-earliest --all-topics --execute
Сброс смещения определенной темы до самого раннего в группе потребителей
kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
<group_name> --reset-offsets --to-earliest --topic <my-topic> --execute
Сброс смещения определенной темы на конкретное смещение в группе потребителей
kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
<group_name> --reset-offsets --to-offset 1000 --topic <my-topic> --execute
Другие поддерживаемые аргументы:
--shift-by [положительное или отрицательное целое число] - смещает смещение вперед или назад от заданного целого числа.
- в текущий и - в последний такие же, как - в смещение и - в самый ранний.
--to-datetime [Формат даты и времени: гггг-мм-ддтчч: мм: сс.ххх]
kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
<group_name> --reset-offsets --to-datetime 2017-08-04T00:00:00.000 [ --all-topics or --topic <topic-name> ] --execute
- by-duration [Формат: PnDTnHnMnS]
kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
<group_name> --reset-offsets --by-duration PT0H10M0S [ --all-topics or --topic <topic-name> ] --execute
Сброс на смещение по длительности от текущей отметки времени.
Как проверить?
Используйте приведенную ниже команду, чтобы проверить текущий/конец смещений и подтвердить сброс выполненных изменений.
kafka-consumer-groups.sh --bootstrap-server <kafka_host:port> --group <group_id> --describe
Пример вывода:
Consumer group 'group1' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
intro 0 0 99 99 - - -
Возможные ошибки:
Ошибка: назначения могут быть сброшены, только если группа '[имя_группы]' неактивна, но текущее состояние - Стабильно.
"Стабильный" означает, что в этой группе работает активный потребитель. Поэтому сначала нужно остановить активных потребителей и повторить попытку сброса смещения.
невозможно сбросить смещения, если есть активный потребитель для группы потребителей.