Кафка - Как совершить смещение после каждого сообщения с использованием потребителя высокого уровня?
Я использую потребитель высокого уровня Kafka. Поскольку я использую Kafka как "очередь транзакций" для своего приложения, мне нужно обязательно убедиться, что я не пропущу и не перечитаю какие-либо сообщения. У меня есть 2 вопроса:
-
Как скопировать смещение в zookeeper? После каждого успешного использования сообщения я отключу автоматическое фиксацию и фиксацию фиксации. Кажется, я не могу найти фактические примеры кода, как это сделать, используя потребитель высокого уровня. Может ли кто-нибудь помочь мне с этим?
-
С другой стороны, я слышал, что фиксация в zookeeper может быть медленной, так что другой способ может заключаться в локальном отслеживании смещений? Является ли этот альтернативный метод целесообразным? Если да, как бы вы к нему подошли?
Ответы
Ответ 1
Есть два соответствующих параметра из http://kafka.apache.org/documentation.html#consumerconfigs.
auto.commit.enable
и
auto.commit.interval.ms
Если вы хотите установить его таким образом, чтобы потребитель фиксировал смещение после каждого сообщения, это будет сложно, поскольку единственный параметр после интервала таймера не после каждого сообщения. Вам нужно будет сделать некоторое предсказание скорости входящих сообщений и соответственно установить время.
В общем, не рекомендуется, чтобы этот интервал был слишком мал, потому что он значительно увеличивает скорость чтения/записи в zookeeper, а zookeeper замедляется, потому что он сильно согласуется с его кворумом.
Ответ 2
Вы можете сначала отключить автоматическую фиксацию: auto.commit.enable=false
Затем зафиксируйте после извлечения сообщения: consumer.commitOffsets(true)