Сохранение подпалубного паркета
У меня есть структура каталогов на основе двух разделов, например:
People
> surname=Doe
> name=John
> name=Joe
> surname=White
> name=Josh
> name=Julien
Я читаю паркетные файлы с информацией только обо всех. И поэтому я напрямую, указав surname = Doe как выходной каталог для моего DataFrame. Теперь проблема заключается в том, что я пытаюсь добавить разбиение на основе имен с partitionBy("name")
при записи.
df.write.partitionBy("name").parquet(outputDir)
(outputDir содержит путь к директории Doe)
Это вызывает ошибку, как показано ниже:
Caused by: java.lang.AssertionError: assertion failed: Conflicting partition column names detected:
Partition column name list #0: surname, name
Partition column name list #1: surname
Любые советы по его решению? Вероятно, это происходит из-за файла _SUCCESS
, созданного в каталоге фамилии, который дает неправильные подсказки Spark - при удалении файлов _SUCCESS
и _metadata
Spark может читать все без каких-либо проблем.
Ответы
Ответ 1
Мне удалось решить его с помощью обходного пути - я не думаю, что это хорошая идея, но я отключил создание дополнительных файлов _SUCCESS и _metadata с помощью:
sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
Таким образом, Spark не получит глупых идей о структурах разбиения.
Еще одна опция - сохранить в "правильном" каталоге - "Люди" и "Разделить по фамилии и имени", но тогда вы должны иметь в виду, что единственный нормальный параметр: SaveMode
- Append
и вручную удаляет каталоги, которые вы ожидаем перезаписать (это действительно ошибка):
df.write.mode(SaveMode.Append).partitionBy("surname","name").parquet("/People")
Не используйте owerwrite SaveMode в этом случае - это приведет к удалению ВСЕХ названий фамилий.
Ответ 2
sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
достаточно разумен, если у вас есть сводные метаданные, тогда запись файла метаданных может стать узким местом IO при чтении и записи.
Альтернативным способом решения может быть добавление .mode( "append" ) к вашей записи, но с исходным родительским каталогом в качестве адресата,
df.write.mode("append").partitionBy("name").parquet("/People")