Кафка, как читать с темы __consumer_offsets
Я пытаюсь выяснить, что заставляет моих текущих потребителей высокого уровня отработать. Я использую Kafka 0.8.2.1, без "offset.storage", установленного в server.properties Kafka - который, я думаю, означает, что смещения хранятся в Kafka. (Я также подтвердил, что никакие смещения не хранятся в Zookeeper, проверяя этот путь в оболочке Zk: /consumers/consumer_group_name/offsets/topic_name/partition_number
)
Я попытался выслушать тему __consumer_offsets
чтобы узнать, какой потребитель сохраняет значение смещений, но это не сработало...
Я попробовал следующее:
создал конфигурационный файл для пользователя консоли следующим образом:
=> more kafka_offset_consumer.config
exclude.internal.topics=false
и попробовал две версии консольных потребительских скриптов:
#1:
bin/kafka-console-consumer.sh --consumer.config kafka_offset_consumer.config --topic __consumer_offsets --zookeeper localhost:2181
#2
./bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 0 --broker-list localhost:9092 --formatter "kafka.server.OffsetManager\$OffsetsMessageFormatter" --consumer.config kafka_offset_consumer.config
Ни работало - оно просто сидит там, но ничего не печатает, даже если потребители активно потребляют/экономят смещения.
Я пропустил некоторые другие конфигурации/свойства?
Спасибо!
Марина
Ответы
Ответ 1
Я сталкивался с этим вопросом, пытаясь также использовать тему __consumer_offsets. Мне удалось выяснить это для разных версий Kafka и я решил поделиться тем, что нашел
Для Кафки 0.8.2.x
Примечание: это использует соединение Zookeeper
#Create consumer config
echo "exclude.internal.topics=false" > /tmp/consumer.config
#Consume all offsets
./kafka-console-consumer.sh --consumer.config /tmp/consumer.config \
--formatter "kafka.server.OffsetManager\$OffsetsMessageFormatter" \
--zookeeper localhost:2181 --topic __consumer_offsets --from-beginning
Для Кафки 0.9.xx и 0.10.xx
#Create consumer config
echo "exclude.internal.topics=false" > /tmp/consumer.config
#Consume all offsets
./kafka-console-consumer.sh --new-consumer --consumer.config /tmp/consumer.config \
--formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" \
--bootstrap-server localhost:9092 --topic __consumer_offsets --from-beginning
Для 0.11.xx - 2.x
#Create consumer config
echo "exclude.internal.topics=false" > /tmp/consumer.config
#Consume all offsets
./kafka-console-consumer.sh --consumer.config /tmp/consumer.config \
--formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
--bootstrap-server localhost:9092 --topic __consumer_offsets --from-beginning
Ответ 2
Хорошо, я выяснил, в чем проблема. Моя Кафка фактически использовала Zookeeper в качестве хранилища смещения, а не Kafka... Причина, по которой я не обнаружил, что это произошло, потому что я неправильно проверял содержимое ZK:
Я делал
ls /consumers/consumer_group_name/offsets/topic_name/partition_number
и ничего не видит. Вместо этого мне пришлось "получить" контент, который показал правильные смещения для моих потребителей, например, ниже:
get /consumers/consumer_group_name/offsets/topic_name/partition_number
185530404
cZxid = 0x70789ad05
ctime = Mon Nov 23 17:49:46 GMT 2015
mZxid = 0x7216cdc5c
mtime = Thu Dec 03 20:18:57 GMT 2015
pZxid = 0x70789ad05
cversion = 0
dataVersion = 3537384
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 9
numChildren = 0
Ответ 3
Если вы добавите --from-beginning
, это, скорее всего, даст вам некоторые результаты, по крайней мере, когда я попытался. И если вы не указали этот аргумент, но прочитали больше сообщений (и совершили смещение затухания), пока вы слушаете этого пользователя, это также должно отображать там сообщения.
Ответ 4
Начиная с Kafka 0.11, исходный код (Scala) можно найти здесь
Для тех, кому нужен перевод на Java, из любого процесса Потребителя, скажем, вы получаете ConsumerRecord<byte[], byte[]> consumerRecord
, и вы можете использовать
-
Получите ключ (проверьте, не первый ли ключ) и используйте GroupMetadataManager.readMessageKey(consumerRecord.key)
. Это может возвращать разные типы, поэтому проверьте if (... instanceof OffsetKey)
, затем произведите его, и вы можете получить от него различные значения.
-
Чтобы получить значение записи Kafka для смещений, вы можете использовать String.valueOf(GroupMetadataManager.readOffsetMessageValue(consumerRecord.value))
Минимальный пример Java, переведенный из кода Scala...
byte[] key = consumerRecord.key;
if (key != null) {
Object o = GroupMetadataManager.readMessageKey(key);
if (o != null && o instanceOf OffsetKey) {
OffsetKey offsetKey = (OffsetKey) o;
Object groupTopicPartition = offsetKey.key;
byte[] value = consumerRecord.value;
String formattedValue = String.valueOf(GroupMetadataManager.readOffsetMessageValue(value);
// TODO: Print, store, or compute results with the new key and value
}
}
Обратите внимание, что также возможно использовать API-интерфейсы AdminClient для описания групп, а не для использования этих необработанных сообщений
Исходный код Scala extract
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
Option(consumerRecord.key).map(key => GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach {
// Only print if the message is an offset record.
// We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp.
case offsetKey: OffsetKey =>
val groupTopicPartition = offsetKey.key
val value = consumerRecord.value
val formattedValue =
if (value == null) "NULL"
else GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value)).toString
output.write(groupTopicPartition.toString.getBytes(StandardCharsets.UTF_8))
output.write("::".getBytes(StandardCharsets.UTF_8))
output.write(formattedValue.getBytes(StandardCharsets.UTF_8))
output.write("\n".getBytes(StandardCharsets.UTF_8))
case _ => // no-op
}
Ответ 5
Для Kafka-2.X используйте следующую команду
kafka-console-consumer --bootstrap-server localhost:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter"