Идентификатор группы по умолчанию Kafka Consumer
Я работаю с Apache Kafka и его Java-клиентом, и я вижу, что сообщения сбалансированы по нагрузке у разных потребителей Kafka, принадлежащих к одной и той же группе (то есть, используя один и тот же идентификатор группы).
В моем приложении мне нужно, чтобы все пользователи читали все сообщения.
Поэтому у меня есть несколько вопросов:
-
если я не устанавливаю идентификатор группы в свойствах "Потребительские", какой идентификатор группы будет указан Kafka Consumer?
-
Есть ли одно значение по умолчанию?
-
Создает ли клиент случайное значение каждый раз?
-
Нужно ли мне создавать разные идентификаторы для каждого потребителя, чтобы быть уверенным, что каждый получает все сообщения?
EDIT: Спасибо за ваши ответы.
Вы правы: если человек не задает идентификатор группы потребителей, Кафка должен жаловаться.
Однако я обнаружил, что если идентификатор группы равен NULL, клиент Java устанавливает его в пустую строку "", чтобы избежать проблем. По-видимому, это значение по умолчанию, которое я искал.
Удивляя всех моих потребителей, даже если я не устанавливаю их groupIds (и поэтому они все с groupId == "), похоже, получают все сообщения, которые производитель пишет.
Я все еще не могу объяснить это: любые предложения?
Ответы
Ответ 1
если я не устанавливаю идентификатор группы в свойствах "Потребительские", какой идентификатор группы будет указан Kafka Consumer?
У потребителя кафки не будет никакой группы потребителей. Вместо этого вы получите эту ошибку: The configured groupId is invalid
Есть ли одно значение по умолчанию?
Да, вы можете увидеть consumer.properties
файл Кафки для справки. Идентификатор группы group.id=test-consumer-group
умолчанию: group.id=test-consumer-group
Создает ли клиент случайное значение каждый раз?
Нет, groupId, по-видимому, является обязательным для Java-клиента, начинающего пользователей Kafka 0.9.0.x. Вы можете обратиться к этой JIRA: https://issues.apache.org/jira/browse/KAFKA-2648
Нужно ли мне создавать разные идентификаторы для каждого потребителя, чтобы убедиться, что каждый из них получает все сообщения?
Да, если все потребители используют один и тот же идентификатор группы, сообщения в теме распространяются среди этих потребителей. Другими словами, каждый потребитель получит неперекрывающееся подмножество сообщений. Наличие большего количества потребителей в одной группе увеличивает степень параллелизма и общую пропускную способность потребления. С другой стороны, если каждый потребитель находится в своей собственной группе, каждый потребитель получит полную копию всех сообщений.
Ответ 2
Не хочу повторять другие ответы, но просто укажу на что-то: вам на самом деле не нужна группа потребителей, чтобы потреблять все сообщения. API Kafka Consumer
(при условии, что мы имеем дело с Java) имеет метод subscribe()
и assign()
. Если вы хотите, чтобы все потребители получали все сообщения без балансировки нагрузки (для этого, в сущности, и предназначены группы потребителей), вы можете просто вызвать assign()
для всех потребителей, передав ему все разделы для темы, за которыми следует seek()
установить смещения; таким образом ваши потребители получат все сообщения.
Таким образом, Kafka не будет управлять назначением разделов и не будет сохранять смещения - за все это отвечает потребитель. В зависимости от вашего варианта использования, это может быть лучшим подходом по сравнению с наличием группы потребителей на одного потребителя.
Ответ 3
Если вы не установите group.id, вы получите ошибку при потреблении данных темы.
org.apache.kafka.common.errors.InvalidGroupIdException: The configured groupId is invalid
22:08:14.132 [testAuto-kafka-consumer-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - (Re-)joining group
22:08:14.132 [testAuto-kafka-consumer-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending JoinGroup ({group_id=,session_timeout=15000,rebalance_timeout=300000,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=18 cap=18]}]}) to coordinator bogon:9092 (id: 2147483647 rack: null)
22:08:14.132 [testAuto-kafka-consumer-1] ERROR org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Attempt to join group failed due to fatal error: The configured groupId is invalid
22:08:14.132 [testAuto-kafka-consumer-1] ERROR org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer - Container exception
Ответ 4
У меня такая же проблема. И потребовалось некоторое время, чтобы исследовать этот вопрос. В проекте spring-cloud-stream
будет проверяться, установлен ли идентификатор группы для потребителя. Если нет, spring-cloud-stream
создаст случайное значение как идентификатор группы. Пожалуйста, обратитесь к методу createConsumerEndpoint
в классе KafkaMessageChannelBinder.