AWS-клей: как обрабатывать вложенные JSON с различными схемами
Цель:
Мы надеемся использовать каталог данных AWS Glue для создания единой таблицы для данных JSON, находящихся в корзине S3, которую мы затем запросим и проанализируем с помощью Redshift Spectrum.
Справочная информация:
Данные JSON взяты из DynamoDB Streams и глубоко вложены. Первый уровень JSON имеет согласованный набор элементов: Keys, NewImage, OldImage, SequenceNumber, ApproximateCreationDateTime, SizeBytes и EventName. Единственное изменение состоит в том, что некоторые записи не имеют NewImage, а некоторые не имеют OldImage. Однако ниже этого первого уровня схема широко варьируется.
В идеале, мы хотели бы использовать Glue только для анализа этого первого уровня JSON и в основном обрабатывать нижние уровни как большие объекты STRING (которые мы затем анализируем при необходимости с помощью Redshift Spectrum). В настоящее время мы загружаем всю запись в один столбец VARCHAR в Redshift, но записи приближаются к максимальному размеру для типа данных в Redshift (максимальная длина VARCHAR составляет 65535). В результате мы хотели бы выполнить этот первый уровень анализа до того, как записи появятся в Redshift.
На что мы уже пробовали/ссылались:
- При указании AWS Glue Crawler в корзину S3 получаются сотни таблиц с согласованной схемой верхнего уровня (перечисленные выше атрибуты), но с разными схемами на более глубоких уровнях в элементах STRUCT. Мы не нашли способ создать задание Glue ETL, которое бы считывало все эти таблицы и загружало его в одну таблицу.
- Создание таблицы вручную не было плодотворным. Мы попытались установить для каждого столбца тип данных STRING, но заданию не удалось загрузить данные (предположительно, поскольку это потребовало бы некоторого преобразования из STRUCT в STRING). При установке столбцов в STRUCT требуется определенная схема, но это именно то, что варьируется от одной записи к другой, поэтому мы не можем предоставить общую схему STRUCT, которая работает для всех рассматриваемых записей.
- AWS Glue Relationalize transform является интригующим, но не тем, что мы ищем в этом сценарии (поскольку мы хотим сохранить часть JSON нетронутой, а не сгладить ее полностью). Redshift Spectrum поддерживает скалярные данные JSON пару недель назад, но это не работает с вложенным JSON, с которым мы имеем дело. Похоже, что ни один из них не помогает в обработке сотен таблиц, созданных программой Clue Crawler.
Вопрос:
Как бы мы использовали Glue (или какой-то другой метод), чтобы позволить нам анализировать только первый уровень этих записей - игнорируя изменяющиеся схемы под элементами на верхнем уровне - чтобы мы могли получить к нему доступ из Spectrum или загрузить его физически в RedShift?
Я новичок в Клее. Я потратил довольно много времени на документацию Glue и просматривал (несколько редкую) информацию на форумах. Я мог упустить что-то очевидное - или, возможно, это ограничение клея в его нынешнем виде. Любые рекомендации приветствуются.
Спасибо!
Ответы
Ответ 1
Я не уверен, что вы можете сделать это с помощью определения таблицы, но вы можете выполнить это с помощью задания ETL, используя функцию отображения для приведения значений верхнего уровня в виде строк JSON. Документация: [ссылка]
import json
# Your mapping function
def flatten(rec):
for key in rec:
rec[key] = json.dumps(rec[key])
return rec
old_df = glueContext.create_dynamic_frame.from_options(
's3',
{"paths": ['s3://...']},
"json")
# Apply mapping function f to all DynamicRecords in DynamicFrame
new_df = Map.apply(frame=old_df, f=flatten)
Отсюда у вас есть возможность экспортировать в S3 (возможно, в Parquet или каком-либо другом столбчатом формате для оптимизации запросов) или напрямую в Redshift из моего понимания, хотя я не пробовал.
Ответ 2
С 20.12.2008 я смог вручную определить таблицу с полями json первого уровня в виде столбцов с типом STRING. Затем в сценарии склеивания динамический кадр имеет столбец в виде строки. Оттуда вы можете выполнить операцию Unbox
типа json
над полями. Это json проанализирует поля и выведет реальную схему. Объединение Unbox
с Filter
позволяет вам циклически проходить и обрабатывать гетерогенные схемы Unbox
с одного и того же входа, если вы можете циклически проходить через список схем.
Однако, одно слово предостережения, это невероятно медленно. Я думаю, что клей загружает исходные файлы из s3 во время каждой итерации цикла. Я пытался найти способ сохранить исходные данные источника, но похоже, что .toDF
выводит схему строковых полей json, даже если вы указываете их как клей StringType. Я добавлю здесь комментарий, если смогу найти решение с лучшей производительностью.
Ответ 3
На данный момент это ограничение Клее. Вы взглянули на классификаторы клея? Это единственная вещь, которую я еще не использовал, но мог бы удовлетворить ваши потребности. Вы можете определить путь JSON для поля или что-то в этом роде.
Помимо этого - Клей Джобс - это путь. Это искры в фоновом режиме, поэтому вы можете делать почти все. Настройте конечную точку разработки и поиграйте с ней. Я столкнулся с различными препятствиями в течение последних трех недель и решил полностью отказаться от всех функциональных возможностей Glue и только Spark, так что он переносится и фактически работает.
Одна вещь, которую вам, возможно, стоит иметь в виду, когда настройка конечной точки dev заключается в том, что для роли IAM должен быть путь "/", поэтому вам, вероятно, потребуется создать отдельную роль вручную, которая имеет этот путь. У созданного автоматически создается путь /service-role/.
Ответ 4
вы должны добавить классификатор клея, предпочтительно $ [*]
Когда вы сканируете json файл в s3, он будет читать первую строку файла.
Вы можете создать клей-задание, чтобы загрузить таблицу каталога данных этого json файла в красное смещение.
Моя единственная проблема здесь в том, что у Redshift Spectrum есть проблемы с чтением json-таблиц в каталоге данных.
сообщите мне, нашли ли вы решение
Ответ 5
Процедура, которую я нашел полезной для мелкого вложенного json:
-
ApplyMapping для первого уровня как datasource0
;
-
Взорвать struct
или array
объектов, чтобы избавиться от уровня элемента df1 = datasource0.toDF().select(id,col1,col2,...,explode(coln).alias(coln)
, где explode
требует from pyspark.sql.functions import explode
;
-
Выберите объекты JSON, которые вы хотели бы сохранить неизменными, с помощью intact_json = df1.select(id, itct1, itct2,..., itctm)
;
-
Преобразование df1
обратно в dynamicFrame и Relationalize dynamicFrame, а также удаление неповрежденных столбцов с помощью dataframe.drop_fields(itct1, itct2,..., itctm)
;
-
Соедините реляционную таблицу с неповрежденной таблицей на основе столбца 'id'.