AWS-клей: как обрабатывать вложенные JSON с различными схемами

Цель: Мы надеемся использовать каталог данных AWS Glue для создания единой таблицы для данных JSON, находящихся в корзине S3, которую мы затем запросим и проанализируем с помощью Redshift Spectrum.

Справочная информация: Данные JSON взяты из DynamoDB Streams и глубоко вложены. Первый уровень JSON имеет согласованный набор элементов: Keys, NewImage, OldImage, SequenceNumber, ApproximateCreationDateTime, SizeBytes и EventName. Единственное изменение состоит в том, что некоторые записи не имеют NewImage, а некоторые не имеют OldImage. Однако ниже этого первого уровня схема широко варьируется.

В идеале, мы хотели бы использовать Glue только для анализа этого первого уровня JSON и в основном обрабатывать нижние уровни как большие объекты STRING (которые мы затем анализируем при необходимости с помощью Redshift Spectrum). В настоящее время мы загружаем всю запись в один столбец VARCHAR в Redshift, но записи приближаются к максимальному размеру для типа данных в Redshift (максимальная длина VARCHAR составляет 65535). В результате мы хотели бы выполнить этот первый уровень анализа до того, как записи появятся в Redshift.

На что мы уже пробовали/ссылались:

  • При указании AWS Glue Crawler в корзину S3 получаются сотни таблиц с согласованной схемой верхнего уровня (перечисленные выше атрибуты), но с разными схемами на более глубоких уровнях в элементах STRUCT. Мы не нашли способ создать задание Glue ETL, которое бы считывало все эти таблицы и загружало его в одну таблицу.
  • Создание таблицы вручную не было плодотворным. Мы попытались установить для каждого столбца тип данных STRING, но заданию не удалось загрузить данные (предположительно, поскольку это потребовало бы некоторого преобразования из STRUCT в STRING). При установке столбцов в STRUCT требуется определенная схема, но это именно то, что варьируется от одной записи к другой, поэтому мы не можем предоставить общую схему STRUCT, которая работает для всех рассматриваемых записей.
  • AWS Glue Relationalize transform является интригующим, но не тем, что мы ищем в этом сценарии (поскольку мы хотим сохранить часть JSON нетронутой, а не сгладить ее полностью). Redshift Spectrum поддерживает скалярные данные JSON пару недель назад, но это не работает с вложенным JSON, с которым мы имеем дело. Похоже, что ни один из них не помогает в обработке сотен таблиц, созданных программой Clue Crawler.

Вопрос: Как бы мы использовали Glue (или какой-то другой метод), чтобы позволить нам анализировать только первый уровень этих записей - игнорируя изменяющиеся схемы под элементами на верхнем уровне - чтобы мы могли получить к нему доступ из Spectrum или загрузить его физически в RedShift?

Я новичок в Клее. Я потратил довольно много времени на документацию Glue и просматривал (несколько редкую) информацию на форумах. Я мог упустить что-то очевидное - или, возможно, это ограничение клея в его нынешнем виде. Любые рекомендации приветствуются.

Спасибо!

Ответы

Ответ 1

Я не уверен, что вы можете сделать это с помощью определения таблицы, но вы можете выполнить это с помощью задания ETL, используя функцию отображения для приведения значений верхнего уровня в виде строк JSON. Документация: [ссылка]

import json

# Your mapping function
def flatten(rec):
    for key in rec:
        rec[key] = json.dumps(rec[key])
    return rec

old_df = glueContext.create_dynamic_frame.from_options(
    's3',
    {"paths": ['s3://...']},
    "json")

# Apply mapping function f to all DynamicRecords in DynamicFrame
new_df = Map.apply(frame=old_df, f=flatten)

Отсюда у вас есть возможность экспортировать в S3 (возможно, в Parquet или каком-либо другом столбчатом формате для оптимизации запросов) или напрямую в Redshift из моего понимания, хотя я не пробовал.

Ответ 2

С 20.12.2008 я смог вручную определить таблицу с полями json первого уровня в виде столбцов с типом STRING. Затем в сценарии склеивания динамический кадр имеет столбец в виде строки. Оттуда вы можете выполнить операцию Unbox типа json над полями. Это json проанализирует поля и выведет реальную схему. Объединение Unbox с Filter позволяет вам циклически проходить и обрабатывать гетерогенные схемы Unbox с одного и того же входа, если вы можете циклически проходить через список схем.

Однако, одно слово предостережения, это невероятно медленно. Я думаю, что клей загружает исходные файлы из s3 во время каждой итерации цикла. Я пытался найти способ сохранить исходные данные источника, но похоже, что .toDF выводит схему строковых полей json, даже если вы указываете их как клей StringType. Я добавлю здесь комментарий, если смогу найти решение с лучшей производительностью.

Ответ 3

На данный момент это ограничение Клее. Вы взглянули на классификаторы клея? Это единственная вещь, которую я еще не использовал, но мог бы удовлетворить ваши потребности. Вы можете определить путь JSON для поля или что-то в этом роде.

Помимо этого - Клей Джобс - это путь. Это искры в фоновом режиме, поэтому вы можете делать почти все. Настройте конечную точку разработки и поиграйте с ней. Я столкнулся с различными препятствиями в течение последних трех недель и решил полностью отказаться от всех функциональных возможностей Glue и только Spark, так что он переносится и фактически работает.

Одна вещь, которую вам, возможно, стоит иметь в виду, когда настройка конечной точки dev заключается в том, что для роли IAM должен быть путь "/", поэтому вам, вероятно, потребуется создать отдельную роль вручную, которая имеет этот путь. У созданного автоматически создается путь /service-role/.

Ответ 4

вы должны добавить классификатор клея, предпочтительно $ [*]

Когда вы сканируете json файл в s3, он будет читать первую строку файла.

Вы можете создать клей-задание, чтобы загрузить таблицу каталога данных этого json файла в красное смещение.

Моя единственная проблема здесь в том, что у Redshift Spectrum есть проблемы с чтением json-таблиц в каталоге данных.

сообщите мне, нашли ли вы решение

Ответ 5

Процедура, которую я нашел полезной для мелкого вложенного json:

  1. ApplyMapping для первого уровня как datasource0;

  2. Взорвать struct или array объектов, чтобы избавиться от уровня элемента df1 = datasource0.toDF().select(id,col1,col2,...,explode(coln).alias(coln), где explode требует from pyspark.sql.functions import explode;

  3. Выберите объекты JSON, которые вы хотели бы сохранить неизменными, с помощью intact_json = df1.select(id, itct1, itct2,..., itctm);

  4. Преобразование df1 обратно в dynamicFrame и Relationalize dynamicFrame, а также удаление неповрежденных столбцов с помощью dataframe.drop_fields(itct1, itct2,..., itctm);

  5. Соедините реляционную таблицу с неповрежденной таблицей на основе столбца 'id'.