Что определяет потребительское смещение Кафки?
Я относительно новичок в Кафке. Я немного поработал с этим, но некоторые вещи неясны для меня относительно потребительского смещения. Из того, что я понял, когда потребитель начинает, смещение, которое он начнет считывать, определяется настройкой конфигурации auto.offset.reset
(исправьте меня, если я ошибаюсь).
Теперь скажите, например, что в этой теме 10 сообщений (смещения от 0 до 9), а потребителю приходилось потреблять 5 из них до того, как он упал (или до того, как я убил потребителя). Затем скажите, что я перезапускаю этот потребительский процесс. Мои вопросы:
Если для параметра auto.offset.reset
установлено значение smallest
, он всегда начинает потреблять со смещения 0?
Если для параметра auto.offset.reset
установлено значение largest
, он начнет потреблять со смещения 5?
Является ли поведение в отношении такого сценария всегда детерминированным? Пожалуйста, не стесняйтесь комментировать, если что-либо в моем вопросе неясно. Спасибо заранее.
Ответы
Ответ 1
Это немного сложнее, чем вы описали. Конфигурация auto.offset.reset
запускается ТОЛЬКО, если ваша группа потребителей не имеет допустимого смещения, зафиксированного где-то (2 поддерживаемых офсетных хранилища теперь являются Kafka и Zookeeper). И это также зависит от того, какой потребитель вы используете.
Если вы используете высокопоставленного Java-потребителя, тогда представьте следующие сценарии:
-
У вас есть потребитель в группе потребителей group1
, которая потребляла 5 сообщений и умерла. В следующий раз, когда вы запустите этого пользователя, он даже не будет использовать эту конфигурацию auto.offset.reset
и продолжит работу с места, где она умерла, потому что она просто выберет сохраненное смещение из хранилища смещения (Kafka или ZK, как я упоминал).
-
У вас есть сообщения в теме (как вы описали), и вы запускаете пользователя в новой группе потребителей group2
. Нет никакого смещения, хранящегося в любом месте, и на этот раз config auto.offset.reset
будет решать, начинать ли с начала темы (smallest
) или с конца темы (largest
)
Еще одна вещь, которая влияет на то, какое значение смещения будет соответствовать конфигурациям smallest
и largest
, - это политика сохранения журнала. Представьте, что у вас есть тема с сохранением, настроенная на 1 час. Вы создаете 5 сообщений, а затем через час вы отправляете еще 5 сообщений. Смещение largest
будет по-прежнему оставаться таким же, как в предыдущем примере, но smallest
не сможет быть 0
, потому что Kafka уже удалит эти сообщения, и, таким образом, наименьшее доступное смещение будет 5
.
Все упомянутое выше не связано с SimpleConsumer
, и каждый раз, когда вы его запускаете, он решает, с чего начать использовать конфигурацию auto.offset.reset
.
Ответ 2
Просто обновление: с Kafka 0.9 и дальше Kafka использует новую версию Java-потребителя, а имена параметров auto.offset.reset изменены; Из руководства:
Что делать, если в Кафке нет начального смещения, или если текущий смещение больше не существует на сервере (например, поскольку эти данные был удален):
самый ранний: автоматически reset смещение к самому раннему смещению
последняя: автоматически reset смещение до последнего смещения
none: вызывать исключение для потребителя, если не найдено никакого предыдущего смещения для группы потребителей
что-нибудь еще: исключение для потребителя.
Я потратил некоторое время, чтобы найти это после проверки ответа выше, поэтому я подумал, что может быть полезно сообществу опубликовать его.
Ответ 3
Дальше еще там offsets.retention.minutes. Если время с момента последнего фиксации → offsets.retention.minutes
, то auto.offset.reset
также пинает