Spark SQL - Разница между df.repartition и DataFrameWriter partitionBy?

В чем разница между методами DataFrame repartition() и DataFrameWriter partitionBy()?

Я надеюсь, что оба используются для "разделения данных на основе столбца dataframe"? Или есть какая-то разница?

Ответы

Ответ 1

Если вы запустите repartition(COL), вы измените разделение во время вычислений - вы получите spark.sql.shuffle.partitions (по умолчанию: 200) разделы. Если вы затем позвоните .write, вы получите один каталог со многими файлами.

Если вы запустите .write.partitionBy(COL), то в результате вы получите столько каталогов, сколько уникальных значений в COL. Это ускоряет дальнейшее чтение данных (если вы фильтруете по столбцам секционирования) и экономит некоторое место в хранилище (столбец секционирования удаляется из файлов данных).

ОБНОВЛЕНИЕ: см. ответ @conradlee. Он подробно объясняет не только, как будет выглядеть структура каталогов после применения различных методов, но также то, как будет получено количество файлов в обоих сценариях.

Ответ 2

Остерегайтесь: я считаю, что принятый ответ не совсем прав! Я рад, что вы задаете этот вопрос, потому что поведение этих функций с одинаковыми именами отличается важными и неожиданными способами, которые недостаточно хорошо документированы в официальной документации по иску.

Первая часть принятого ответа верна: вызов df.repartition(COL, numPartitions=k) создаст фрейм с разделами k, используя секционирующий хэш. COL здесь определяет ключ разделения - это может быть один столбец или список столбцов. Разделитель на основе хэша принимает каждый ключ раздела строки ввода, хэширует его в пространство разделов k через что-то вроде partition = hash(partitionKey) % k. Это гарантирует, что все строки с одним и тем же ключом разделяются на один раздел. Однако строки из нескольких ключей разделов также могут оказаться в одном разделе (когда происходит хеш-столкновение между ключами раздела), а некоторые разделы могут быть пустыми.

Таким образом, неинтуитивные аспекты df.repartition(COL, numPartitions=k) заключаются в том, что

  • разделы не будут строго разделять ключи разделов
  • некоторые из ваших разделов k могут быть пустыми, тогда как другие могут содержать строки из нескольких разделов.

Поведение df.write.partitionBy совершенно иное, так что многие пользователи не ожидают. Скажем, что вы хотите, чтобы ваши выходные файлы были разделены по дате, а ваши данные занимают более 7 дней. Пусть также предположим, что df имеет 10 разделов. Когда вы запускаете df.write.partitionBy('day'), сколько выходных файлов вы должны ожидать? Ответ "это зависит". Если каждый раздел ваших начальных разделов в df содержит данные с каждого дня, тогда ответ равен 70. Если каждый из ваших начальных разделов в df содержит данные ровно через один день, тогда ответ будет равен 10.

Как мы можем объяснить это поведение? Когда вы запускаете df.write, каждый из исходных разделов в df записывается независимо. То есть каждый из ваших первоначальных 10 разделов подразделяется отдельно на столбец "день", а отдельный файл записывается для каждого подраздела.

Я нахожу это поведение довольно раздражающим и желаю, чтобы был способ сделать глобальное перераспределение при записи данных.

Ответ 3

repartition() используется для разделения данных в памяти, а partitionBy используется для разделения данных на диске. Они часто используются вместе, как описано в этом сообщении в блоге.

И repartition(), и partitionBy могут использоваться для "разделения данных на основе столбца данных", но repartition() разделяет данные в памяти, а partitionBy разделяет данные на диске.

передел()

Давайте поиграем с кодом, чтобы лучше понять разбиение. Предположим, у вас есть следующие данные CSV.

first_name,last_name,country
Ernesto,Guevara,Argentina
Vladimir,Putin,Russia
Maria,Sharapova,Russia
Bruce,Lee,China
Jack,Ma,China

df.repartition(col("country")) перераспределит данные по странам в памяти.

Давайте выпишем данные, чтобы мы могли проверить содержимое каждого раздела памяти.

val outputPath = new java.io.File("./tmp/partitioned_by_country/").getCanonicalPath
df.repartition(col("country"))
  .write
  .csv(outputPath)

Вот как данные записываются на диск:

partitioned_by_country/
  part-00002-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv
  part-00044-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv
  part-00059-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv

Каждый файл содержит данные для одной страны - например, файл part-00059-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv содержит данные по Китаю:

Bruce,Lee,China
Jack,Ma,China

partitionBy()

Давайте запишем данные на диск с помощью partitionBy и посмотрим, чем отличаются выходные данные файловой системы.

Вот код для записи данных на разделы диска.

val outputPath = new java.io.File("./tmp/partitionedBy_disk/").getCanonicalPath
df
  .write
  .partitionBy("country")
  .csv(outputPath)

Вот как выглядят данные на диске:

partitionedBy_disk/
  country=Argentina/
    part-00000-906f845c-ecdc-4b37-a13d-099c211527b4.c000.csv
  country=China/
    part-00000-906f845c-ecdc-4b37-a13d-099c211527b4.c000
  country=Russia/
    part-00000-906f845c-ecdc-4b37-a13d-099c211527b4.c000

Зачем разделять данные на диск?

Разделение данных на диске может сделать некоторые запросы намного быстрее, как описано в этом сообщении в блоге.