Apache Kafka с Avro и Schema Repo - где в сообщении идёт идентификатор схемы?

Я хочу использовать Avro для сериализации данных для своих сообщений Kafka и хотел бы использовать его в репозитории схемы Avro, поэтому мне не нужно включать схему с каждым сообщением.

Использование Avro с Kafka кажется популярной задачей, а также множество вопросов по блогам/вопросам /usergroups и т.д., отправляя идентификатор схемы с сообщением, но я не могу найти фактический пример того, куда он должен идти.

Я думаю, что он должен идти в заголовок сообщения Kafka где-то, но я не могу найти очевидное место. Если бы это было в сообщении Avro, вам нужно было бы декодировать его по схеме, чтобы получить содержимое сообщения и открыть схему, которую нужно декодировать, что имеет очевидные проблемы.

Я использую клиент С#, но пример на любом языке был бы замечательным. Класс сообщения имеет следующие поля:

public MessageMetadata Meta { get; set; }
public byte MagicNumber { get; set; }
public byte Attribute { get; set; }
public byte[] Key { get; set; }
public byte[] Value { get; set; }

но не все они кажутся правильными. MessageMetaData имеет только Offset и PartitionId.

Итак, где должен быть идентификатор схемы Avro?

Ответы

Ответ 1

Идентификатор схемы фактически закодирован в самом сообщении avro. Посмотрите этот, чтобы увидеть, как реализованы кодеры/декодеры.

В общем, что происходит, когда вы отправляете сообщение Avro в Kafka:

  • Кодер получает схему от подлежащего кодированию объекта.
  • Encoder запрашивает реестр схемы для идентификатора для этой схемы. Если схема уже зарегистрирована, вы получите существующий идентификатор, если нет - реестр зарегистрирует схему и вернет новый идентификатор.
  • Объект закодирован следующим образом: [magic byte] [идентификатор схемы] [фактическое сообщение], где магический байт - это только 0x0 байт, который используется для различения таких сообщений, идентификатор схемы - это 4-байтовое целочисленное значение остальное - это фактическое закодированное сообщение.

Когда вы декодируете сообщение здесь, что происходит:

  • Декодер считывает первый байт и гарантирует, что он 0x0.
  • Декодер считывает следующие 4 байта и преобразует их в целочисленное значение. Вот как декодируется идентификатор схемы.
  • Теперь, когда декодер имеет идентификатор схемы, он может запросить реестр схемы для фактической схемы для этого идентификатора. Вуаля!

Если ваш ключ закодирован в Avro, ваш ключ будет иметь формат, описанный выше. То же самое относится и к значению. Таким образом, ваш ключ и значение могут быть как значениями Avro, так и использовать разные схемы.

Изменить, чтобы ответить на вопрос в комментарии:

Фактическая схема хранится в репозитории схемы (на самом деле это весь репозиторий схемы - для хранения схем:)). Формат Avro Object Container Files не имеет ничего общего с форматом, описанным выше. KafkaAvroEncoder/Decoder использует немного другой формат сообщения (но фактические сообщения кодируются точно так же, как и все).

Основное различие между этими форматами заключается в том, что файлы Container Contain несут фактическую схему и могут содержать несколько сообщений, соответствующих этой схеме, тогда как формат, описанный выше, содержит только идентификатор схемы и ровно одно сообщение, соответствующее этой схеме.

Передача сообщений, связанных с контейнером в виде файла-контейнера, вероятно, будет неочевидна, чтобы следовать/поддерживать, потому что одно сообщение Kafka будет содержать несколько сообщений Avro. Или вы можете убедиться, что одно сообщение Kafka содержит только одно сообщение Avro, но это приведет к переносу схемы с каждым сообщением.

Схемы Avro могут быть довольно большими (я видел схемы, такие как 600 КБ и более), и перенос схемы с каждым сообщением был бы очень дорогостоящим и расточительным, так что там, где репозиторий схемы запускается - схема извлекается только один раз и получает кеширование локально, и все другие поисковые запросы - это просто быстрый поиск по карте.