Как изменить размер раздела в Spark SQL
У меня есть требование для загрузки данных из таблицы Hive с использованием spark-SQL HiveContext
и загрузки в HDFS. По умолчанию вывод DataFrame
из SQL имеет 2 раздела. Чтобы получить больше parallelism, мне нужно больше разделов из SQL. В HiveContext нет перегруженного метода, чтобы принять число параметров разделов.
Перераспределение RDD вызывает перетасовку и приводит к большему времени обработки.
val result = sqlContext.sql("select * from bt_st_ent")
Вывод журнала:
Starting task 0.0 in stage 131.0 (TID 297, aster1.com, partition 0,NODE_LOCAL, 2203 bytes)
Starting task 1.0 in stage 131.0 (TID 298, aster1.com, partition 1,NODE_LOCAL, 2204 bytes)
Я хотел бы знать, есть ли способ увеличить размер разделов вывода sql.
Ответы
Ответ 1
Искры <2.0:
Вы можете использовать параметры конфигурации Hadoop:
-
mapred.min.split.size
. -
mapred.max.split.size
а также размер блока HDFS для управления размером раздела для форматов на базе файловой системы *.
val minSplit: Int = ???
val maxSplit: Int = ???
sc.hadoopConfiguration.setInt("mapred.min.split.size", minSplit)
sc.hadoopConfiguration.setInt("mapred.max.split.size", maxSplit)
Искра 2. 0+:
Вы можете использовать конфигурацию spark.sql.files.maxPartitionBytes
:
spark.conf.set("spark.sql.files.maxPartitionBytes", maxSplit)
В обоих случаях эти значения могут не использоваться конкретным API-интерфейсом источника данных, поэтому вы всегда должны проверять детали документации/реализации используемого вами формата.
* Другие форматы ввода могут использовать разные настройки. См. Например
Кроме того, Datasets
созданные из RDDs
, наследуют расположение разделов от их родителей.
Подобным образом таблицы в квадратных скобках будут использовать компоновку ковша, определенную в метасторе, с соотношением 1:1 между ведром и разделом Dataset
.
Ответ 2
Очень распространенная и болезненная проблема. Вы должны искать ключ, который распределяет данные в единых разделах. Вы можете использовать операторы DISTRIBUTE BY
и CLUSTER BY
, чтобы указать искрам, чтобы группировать строки в разделе. Это вызовет некоторые накладные расходы на самом запросе. Но это приведет к раздельным размерам разделов. Deepsense имеет очень хорошее руководство по этому вопросу.
Ответ 3
Если ваш SQL выполняет перетасовку (например, у него есть соединение или какая-то группа), вы можете установить количество разделов, установив свойство spark.sql.shuffle.partitions
sqlContext.setConf( "spark.sql.shuffle.partitions", 64)
Следуя тому, что предлагает Фокко, вы можете использовать случайную переменную для кластера.
val result = sqlContext.sql("""
select * from (
select *,random(64) as rand_part from bt_st_ent
) cluster by rand_part""")