Работа над проблемами производительности и памяти с помощью spark-sql GROUP BY
Рассмотрим следующий пример запуска GROUP BY
с относительно большим числом агрегатов и относительно большим количеством групп:
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.SparkContext._
val h = new HiveContext(sc)
import h.implicits._
val num_columns = 3e3.toInt
val num_rows = 1e6.toInt
val num_groups = 1e5.toInt
case class Data(A: Long = (math.random*num_groups).toLong)
val table = (1 to num_rows).map(i => Data()).toDF
val aggregations = (1 to num_columns).map(i => s"count(1) as agg_$i")
table.registerTempTable("table")
val result = h.sql(s"select a, ${aggregations.mkString(",")} from table group by a")
// Write the result to make sure everyting is executed
result.save(s"result_${num_columns}_${num_rows}_${num_groups}.parquet", "parquet")
Ввод этого задания - всего 8 МБ, выход - около 2,4 ГБ, и я запускаю его на кластере с тремя рабочими машинами с 61 ГБ памяти. Результат: все рабочие сбой с исключениями OutOfMemory.
Даже с более низкими значениями для num_columns
задание становится неоправданно медленным из-за накладных расходов GC.
Мы попытались включить:
- уменьшение размера раздела (уменьшает объем памяти, но увеличивает накладные расходы)
- предварительная разбивка данных с помощью HashPartitioner перед выполнением агрегации (уменьшает потребление памяти, но требует полной перестановки до того, как произойдет какая-либо реальная работа)
Есть ли лучшие способы достижения желаемого эффекта?
Ответы
Ответ 1
В целом, почти универсальное решение таких проблем, как это, состоит в том, чтобы сохранить размер раздела в разумных размерах. Хотя "разумный" слегка субъективен и может варьироваться от случая к случаю, 100-200 МБ выглядит как хорошее место для начала.
Я могу легко объединить данные примера, предоставленные вами одному работнику, сохраняя по умолчанию spark.executor.memory
(1 ГБ) и ограничивая общие доступные ресурсы до 8 ядер и 8 ГБ ОЗУ. Все это, используя 50 разделов и сохраняя время агрегации около 3 секунд без какой-либо специальной настройки (это более или менее согласовано между 1.5.2 и 2.0.0).
Итак, суммируем: либо увеличиваем spark.default.parallelism
, либо явно устанавливаем количество разделов при создании DataFrame
, если это возможно. По умолчанию spark.sql.shuffle.partitions
должно быть достаточно для небольшого набора данных, подобного этому.
Ответ 2
Как я не уверен, какую функцию агрегации вы используете, трудно сказать, что искра делает в фоновом режиме. В любом случае, чтобы иметь больше контроля над каждой функцией агрегации, я бы выполнил преобразование reduceByKey для каждого из них на базовом RDD. Затем вы можете просто объединить результаты, если это необходимо. таким образом, у вас больше контроля и вы можете увидеть, какая из этих агрегатов "стоит" вам больше всего, плюс вы можете избежать групповой операции, которая наряду с перетасовкой может также вызвать проблемы с памятью (из-за движения целых наборов данные в один раздел). Ниже приведена краткая иллюстрация, где aggrigationFunctions - это список ваших функций агрегации с их идентификатором и фактической функцией (список кортежей).
val aggrigationResults = aggrigationFunctions.map(
f => {
val aggRes = baseRdd
.map(x => (x.[the field to group by], x.[the value to aggrigate]))
.reduceByKey(f.func)
(f.id, aggRes)
}
)