Spark: наиболее эффективный способ сортировки и разделения данных, которые должны быть записаны как паркет
Мои данные в принципе представляют собой таблицу, которая содержит столбец ID
и столбец GROUP_ID
, помимо других "данных".
На первом этапе я читаю CSV в Spark, делаю некоторую обработку для подготовки данных для второго шага и записываю данные как паркет.
На втором этапе много groupBy('GROUP_ID')
и Window.partitionBy('GROUP_ID').orderBy('ID')
.
Теперь цель - во избежание перетасовки на втором шаге - для эффективной загрузки данных на первом этапе, поскольку это один таймер.
Вопрос Часть 1: AFAIK, Spark сохраняет разбиение при загрузке с паркета (которое на самом деле является основой любого "оптимизированного рассмотрения записи" ) - правильно?
Я придумал три возможности:
-
df.orderBy('ID').write.partitionBy('TRIP_ID').parquet('/path/to/parquet')
-
df.orderBy('ID').repartition(n, 'TRIP_ID').write.parquet('/path/to/parquet')
-
df.repartition(n, 'TRIP_ID').sortWithinPartitions('ID').write.parquet('/path/to/parquet')
Я бы установил n
таким образом, чтобы отдельные файлы паркета составляли ~ 100 МБ.
Вопрос Часть 2: Правильно ли, что три варианта дают "одинаковые" /похожие результаты в отношении цели (избегайте перетасовки на втором шаге)? Если нет, в чем разница? И какой из них "лучше"?
Вопрос. Часть 3:. Какой из трех вариантов лучше работает в отношении шага 1?
Спасибо, что поделились своими знаниями!
EDIT 2017-07-24
После выполнения некоторых тестов (запись и чтение из паркета) кажется, что Spark не может восстановить partitionBy
и orderBy
информацию по умолчанию на втором шаге. Количество разделов (как получено из df.rdd.getNumPartitions()
, по-видимому, определяется количеством ядер и/или spark.default.parallelism
(если установлено), но не количеством паркетных разделов, поэтому ответ на вопрос 1 будет WRONG, а вопросы 2 и 3 будут неактуальны.
Итак, оказывается, что REAL QUESTION: есть ли способ сказать Spark, что данные уже разделены столбцом X и отсортированы по столбцу Y
Ответы
Ответ 1
Насколько я знаю, НЕТ, нет никакого способа прочитать данные с паркета и сказать Spark, что они уже разделены каким-либо выражением и упорядочены.
Короче говоря, один файл в HDFS и т.д. Слишком велик для одного раздела Spark. И даже если вы прочитаете весь файл в один раздел, играя со свойствами Parquet, такими как parquet.split.files=false
, parquet.task.side.metadata=true
и т.д., Будет больше затрат по сравнению с одним перемешиванием.
Ответ 2
Попробуйте bucketBy. Также может помочь обнаружение разделов.
Ответ 3
Возможно, вам будет интересна поддержка в Spark.
Подробности смотрите здесь https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-bucketing.html
large.write
.bucketBy(4, "id")
.sortBy("id")
.mode(SaveMode.Overwrite)
.saveAsTable(bucketedTableName)
Обратите внимание, что в Spark 2.4 добавлена поддержка bucket pruning
partition pruning
(например, partition pruning
).
Более прямая функциональность, на которую вы обращаете внимание - это таблицы с сортировкой по Hive https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-BucketedSortedTables. Это еще не доступно в Spark (см. Раздел PS ниже) )
Также обратите внимание на то, что информация о сортировке не будет загружаться Spark автоматически, но поскольку данные уже отсортированы... операция сортировки на самом деле будет гораздо быстрее, чем выполнение большой работы - например, одна передача данных только для подтверждения того, что она уже отсортировано.
PS. Spark и Hive ведра немного отличаются. Это зонтичный билет для обеспечения совместимости в Spark для таблиц с пакетами, созданных в Hive - https://issues.apache.org/jira/browse/SPARK-19256