Как эффективно перемещать данные с Kafka на таблицу Impala?
Вот шаги к текущему процессу:
- Flafka записывает журналы в "зону приземления" на HDFS.
- Задание, запланированное Oozie, копирует полные файлы из зоны приземления в промежуточную область.
- Данные промежуточного этапа являются "schema-ified" таблицей Hive, которая использует промежуточную область в качестве своего местоположения.
- Записи из промежуточной таблицы добавляются в постоянную таблицу Hive (например,
insert into permanent_table select * from staging_table
).
- Данные из таблицы Hive доступны в Impala, выполнив
refresh permanent_table
в Impala.
![существующий поток данных]()
Я смотрю на процесс, который я создал, и он "плохо пахнет": слишком много промежуточных шагов, которые ухудшают поток данных.
Около 20 месяцев назад я увидел демоверсию, в которой данные транслировались из трубки Amazon Kinesis, и была доступна в Impala в почти реальном времени. Я не думаю, что они сделали что-то довольно уродливое/запутанное. Есть ли более эффективный способ передачи данных от Kafka в Impala (возможно, потребитель Kafka, который может сериализоваться в Parquet)?
Я предполагаю, что "потоковая передача данных в SQL с низкой задержкой" должна быть довольно распространенным вариантом использования, и поэтому мне интересно узнать, как другие люди решили эту проблему.
Ответы
Ответ 1
Если вам нужно сбрасывать данные Kafka как есть в HDFS, лучшим вариантом является использование соединителя Kafka Connect и Confluent HDFS.
Вы можете либо сбросить данные в файл паркета на HDFS, который вы можете загрузить в Impala.
Вам нужно, я думаю, вы захотите использовать секвенсор TimeBasedPartitioner для создания паркетных файлов каждые X миллисекунд (настройка параметра конфигурации partition.duration.ms).
Добавьте что-то вроде этого в конфигурацию Kafka Connect, чтобы сделать трюк:
# Don't flush less than 1000 messages to HDFS
flush.size = 1000
# Dump to parquet files
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
partitioner.class = TimebasedPartitioner
# One file every hour. If you change this, remember to change the filename format to reflect this change
partition.duration.ms = 3600000
# Filename format
path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/'minute'=mm