Понимание физического плана искры
Я пытаюсь понять физические планы искры, но я не понимаю некоторых частей, потому что они кажутся отличными от традиционных rdbms. Например, в этом плане ниже, это план запроса по таблице улей. Запрос таков:
select
l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
lineitem
where
l_shipdate <= '1998-09-16'
group by
l_returnflag,
l_linestatus
order by
l_returnflag,
l_linestatus;
== Physical Plan ==
Sort [l_returnflag#35 ASC,l_linestatus#36 ASC], true, 0
+- ConvertToUnsafe
+- Exchange rangepartitioning(l_returnflag#35 ASC,l_linestatus#36 ASC,200), None
+- ConvertToSafe
+- TungstenAggregate(key=[l_returnflag#35,l_linestatus#36], functions=[(sum(l_quantity#31),mode=Final,isDistinct=false),(sum(l_extendedpr#32),mode=Final,isDistinct=false),(sum((l_extendedprice#32 * (1.0 - l_discount#33))),mode=Final,isDistinct=false),(sum(((l_extendedprice#32 * (1.0l_discount#33)) * (1.0 + l_tax#34))),mode=Final,isDistinct=false),(avg(l_quantity#31),mode=Final,isDistinct=false),(avg(l_extendedprice#32),mode=Fl,isDistinct=false),(avg(l_discount#33),mode=Final,isDistinct=false),(count(1),mode=Final,isDistinct=false)], output=[l_returnflag#35,l_linestatus,sum_qty#0,sum_base_price#1,sum_disc_price#2,sum_charge#3,avg_qty#4,avg_price#5,avg_disc#6,count_order#7L])
+- TungstenExchange hashpartitioning(l_returnflag#35,l_linestatus#36,200), None
+- TungstenAggregate(key=[l_returnflag#35,l_linestatus#36], functions=[(sum(l_quantity#31),mode=Partial,isDistinct=false),(sum(l_exdedprice#32),mode=Partial,isDistinct=false),(sum((l_extendedprice#32 * (1.0 - l_discount#33))),mode=Partial,isDistinct=false),(sum(((l_extendedpri32 * (1.0 - l_discount#33)) * (1.0 + l_tax#34))),mode=Partial,isDistinct=false),(avg(l_quantity#31),mode=Partial,isDistinct=false),(avg(l_extendedce#32),mode=Partial,isDistinct=false),(avg(l_discount#33),mode=Partial,isDistinct=false),(count(1),mode=Partial,isDistinct=false)], output=[l_retulag#35,l_linestatus#36,sum#64,sum#65,sum#66,sum#67,sum#68,count#69L,sum#70,count#71L,sum#72,count#73L,count#74L])
+- Project [l_discount#33,l_linestatus#36,l_tax#34,l_quantity#31,l_extendedprice#32,l_returnflag#35]
+- Filter (l_shipdate#37 <= 1998-09-16)
+- HiveTableScan [l_discount#33,l_linestatus#36,l_tax#34,l_quantity#31,l_extendedprice#32,l_shipdate#37,l_returnflag#35], astoreRelation default, lineitem, None
Я понимаю, что в плане есть:
-
Сначала начинается сканирование таблицы Hive
-
Затем он фильтрует, используя условие
-
Затем проецируем нужные столбцы
-
Затем TungstenAggregate?
-
Затем TungstenExchange?
-
Затем снова вольфрамовый агрегат?
-
Затем ConvertToSafe?
-
Затем сортирует окончательный результат
Но я не понимаю шаги 4, 5, 6 и 7. Вы знаете, кто они? Я ищу информацию об этом, чтобы понять план, но я не нахожу ничего конкретного.
Ответы
Ответ 1
Давайте рассмотрим структуру используемого вами SQL-запроса:
SELECT
... -- not aggregated columns #1
... -- aggregated columns #2
FROM
... -- #3
WHERE
... -- #4
GROUP BY
... -- #5
ORDER BY
... -- #6
Как вы уже подозреваете:
-
Filter (...)
соответствует предикатам в WHERE
(#4
)
-
Project ...
ограничивает число столбцов теми, которые требуются объединением (#1
и #2
, и #4
/#6
, если нет в SELECT
)
-
HiveTableScan
соответствует разделу FROM
(#3
)
Остальные части могут быть отнесены следующим образом:
-
#2
из SELECT
- - functions
в поле TungstenAggregates
-
GROUP BY
(#4
):
-
TungstenExchange
/хэш-разбиение
-
key
поле в TungstenAggregates
-
#6
- ORDER BY
.
Проект Вольфрам в целом описывает набор оптимизаций, используемых Spark DataFrames
(- sets
), включая:
- явное управление памятью с помощью
sun.misc.Unsafe
. Это означает использование "родной" (вне кучи) памяти и явное выделение/освобождение памяти за пределами управления GC. Эти преобразования соответствуют шагам ConvertToUnsafe
/ConvertToSafe
в плане выполнения. Вы можете узнать некоторые интересные подробности о небезопасности из Общие сведения о sun.misc.Unsafe
- генерация кода - различные метапрограммные трюки, предназначенные для генерации кода, который лучше оптимизируется во время компиляции. Вы можете думать о нем как о внутреннем компиляторе Spark, который делает такие вещи, как переписывание хорошего функционального кода в уродливые для циклов.
Вы можете узнать больше о Вольфраме вообще от Project Tungsten: Приведение Apache Spark ближе к Bare Metal. Apache Spark 2.0: быстрее, проще и умнее содержит некоторые примеры генерации кода.
TungstenAggregate
происходит дважды, потому что данные сначала агрегируются локально на каждом разделе, а затем перетасовываются и, наконец, сливаются. Если вы знакомы с RDD API, этот процесс примерно эквивалентен reduceByKey
.
Если план выполнения нечеткий, вы также можете попытаться преобразовать полученный результат DataFrame
в RDD
и проанализировать вывод toDebugString
.
Ответ 2
Tungsten - новый движок памяти в Spark начиная с версии 1.4, который управляет данными за пределами JVM, чтобы сэкономить некоторые накладные расходы на GC. Вы можете себе представить, что это связано с копированием данных из JVM и в JVM. Это. В Spark 1.5 вы можете включить Tungsten через spark.sql.tungsten.enabled
, тогда вы увидите "старый" план, в Spark 1.6. Думаю, вы больше не можете его отключать.