Идентификатор группы по умолчанию Kafka Consumer

Я работаю с Apache Kafka и его Java-клиентом, и я вижу, что сообщения сбалансированы по нагрузке у разных потребителей Kafka, принадлежащих к одной и той же группе (то есть, используя один и тот же идентификатор группы).

В моем приложении мне нужно, чтобы все пользователи читали все сообщения.

Поэтому у меня есть несколько вопросов:

  • если я не устанавливаю идентификатор группы в свойствах "Потребительские", какой идентификатор группы будет указан Kafka Consumer?

  • Есть ли одно значение по умолчанию?

  • Создает ли клиент случайное значение каждый раз?

  • Нужно ли мне создавать разные идентификаторы для каждого потребителя, чтобы быть уверенным, что каждый получает все сообщения?

EDIT: Спасибо за ваши ответы.

Вы правы: если человек не задает идентификатор группы потребителей, Кафка должен жаловаться.

Однако я обнаружил, что если идентификатор группы равен NULL, клиент Java устанавливает его в пустую строку "", чтобы избежать проблем. По-видимому, это значение по умолчанию, которое я искал.

Удивляя всех моих потребителей, даже если я не устанавливаю их groupIds (и поэтому они все с groupId == "), похоже, получают все сообщения, которые производитель пишет.

Я все еще не могу объяснить это: любые предложения?

Ответы

Ответ 1

если я не устанавливаю идентификатор группы в свойствах "Потребительские", какой идентификатор группы будет указан Kafka Consumer?

У потребителя кафки не будет никакой группы потребителей. Вместо этого вы получите эту ошибку: The configured groupId is invalid

Есть ли одно значение по умолчанию?

Да, вы можете увидеть consumer.properties файл Кафки для справки. Идентификатор группы group.id=test-consumer-group умолчанию: group.id=test-consumer-group

Создает ли клиент случайное значение каждый раз?

Нет, groupId, по-видимому, является обязательным для Java-клиента, начинающего пользователей Kafka 0.9.0.x. Вы можете обратиться к этой JIRA: https://issues.apache.org/jira/browse/KAFKA-2648

Нужно ли мне создавать разные идентификаторы для каждого потребителя, чтобы убедиться, что каждый из них получает все сообщения?

Да, если все потребители используют один и тот же идентификатор группы, сообщения в теме распространяются среди этих потребителей. Другими словами, каждый потребитель получит неперекрывающееся подмножество сообщений. Наличие большего количества потребителей в одной группе увеличивает степень параллелизма и общую пропускную способность потребления. С другой стороны, если каждый потребитель находится в своей собственной группе, каждый потребитель получит полную копию всех сообщений.

Ответ 2

Не хочу повторять другие ответы, но просто укажу на что-то: вам на самом деле не нужна группа потребителей, чтобы потреблять все сообщения. API Kafka Consumer (при условии, что мы имеем дело с Java) имеет метод subscribe() и assign(). Если вы хотите, чтобы все потребители получали все сообщения без балансировки нагрузки (для этого, в сущности, и предназначены группы потребителей), вы можете просто вызвать assign() для всех потребителей, передав ему все разделы для темы, за которыми следует seek() установить смещения; таким образом ваши потребители получат все сообщения.

Таким образом, Kafka не будет управлять назначением разделов и не будет сохранять смещения - за все это отвечает потребитель. В зависимости от вашего варианта использования, это может быть лучшим подходом по сравнению с наличием группы потребителей на одного потребителя.

Ответ 3

Если вы не установите group.id, вы получите ошибку при потреблении данных темы.

org.apache.kafka.common.errors.InvalidGroupIdException: The configured groupId is invalid
22:08:14.132 [testAuto-kafka-consumer-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - (Re-)joining group 
22:08:14.132 [testAuto-kafka-consumer-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending JoinGroup ({group_id=,session_timeout=15000,rebalance_timeout=300000,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=18 cap=18]}]}) to coordinator bogon:9092 (id: 2147483647 rack: null)
22:08:14.132 [testAuto-kafka-consumer-1] ERROR org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Attempt to join group  failed due to fatal error: The configured groupId is invalid
22:08:14.132 [testAuto-kafka-consumer-1] ERROR org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer - Container exception

Ответ 4

У меня такая же проблема. И потребовалось некоторое время, чтобы исследовать этот вопрос. В проекте spring-cloud-stream будет проверяться, установлен ли идентификатор группы для потребителя. Если нет, spring-cloud-stream создаст случайное значение как идентификатор группы. Пожалуйста, обратитесь к методу createConsumerEndpoint в классе KafkaMessageChannelBinder.

Image