Ответ 1
У вас есть несколько вариантов. В моем коде ниже я предполагаю, что вы хотите написать в паркет, но, конечно, вы можете изменить это.
(1) df.repartition(numPartitions, * cols).write.partitionBy(* cols).parquet(writePath)
Сначала будет использоваться разделение на основе хеша, чтобы гарантировать, что ограниченное число значений из COL попадет в каждый раздел. В зависимости от значения, numPartitions
вами для numPartitions
, некоторые разделы могут быть пустыми, в то время как другие могут быть переполнены значениями - для тех, кто не знает почему, прочитайте это. Затем, когда вы вызываете partitionBy
в DataFrameWriter, каждое уникальное значение в каждом разделе будет помещено в отдельный файл.
Предупреждение: этот подход может привести к разным размерам разделов и временам выполнения односторонних задач. Это происходит, когда значения в вашем столбце связаны со многими строками (например, столбец города - в файле для Нью-Йорка может быть много строк), тогда как другие значения менее многочисленны (например, значения для небольших городов).
(2) df.sort(sortCols).write.parquet(writePath)
Эта опция прекрасно работает, когда вы хотите (1) файлы, которые вы пишете, иметь почти равные размеры (2) точный контроль над количеством записанных файлов. Этот подход сначала глобально сортирует ваши данные, а затем находит разбиения, которые разбивают данные на k
разделов равномерного размера, где k
указано в конфигурации config spark.sql.shuffle.partitions
. Это означает, что все значения с одинаковыми значениями вашего ключа сортировки смежны друг с другом, но иногда они разделяют разделение и находятся в разных файлах. Это, если ваш вариант использования требует, чтобы все строки с одинаковым ключом были в одном разделе, то не используйте этот подход.
Есть два дополнительных бонуса: (1) путем сортировки данных их размер на диске часто может быть уменьшен (например, сортировка всех событий по user_id, а затем по времени приведет к большому количеству повторений в значениях столбцов, что способствует сжатию) и (2 ) если вы записываете в формат файла, который поддерживает его (например, Parquet), то последующие читатели могут оптимально считывать данные с помощью предиката push-down, потому что средство записи паркета запишет значения MAX и MIN каждого столбца в метаданных, позволяя считыватель для пропуска строк, если в запросе указаны значения за пределами диапазона (min, max).
Обратите внимание, что сортировка в Spark обходится дороже, чем просто перераспределение и требует дополнительного этапа. За кулисами Spark сначала определяет разбиения на одном этапе, а затем перетасовывает данные в эти разбиения на другом этапе.
(3) df.rdd.partitionBy(customPartitioner).toDF(). Write.parquet(writePath)
Если вы используете spark в Scala, то вы можете написать клиентский разделитель, который сможет преодолеть надоедливые ошибки разделителя на основе хеша. К сожалению, не вариант в PySpark. Если вы действительно хотите написать собственный разделитель в pySpark, я обнаружил, что это возможно, хотя и немного неловко, используя rdd.repartitionAndSortWithinPartitions
:
df.rdd \
.keyBy(sort_key_function) \ # Convert to key-value pairs
.repartitionAndSortWithinPartitions(numPartitions=N_WRITE_PARTITIONS,
partitionFunc=part_func) \
.values() # get rid of keys \
.toDF().write.parquet(writePath)
Может быть, кто-то еще знает более простой способ использовать пользовательский разделитель на фрейме данных в pyspark?