Ответ 1
Даже я искал в Интернете, чтобы узнать, как искра вычисляет DAG из RDD и затем выполняет эту задачу.
На высоком уровне, когда любое действие вызывается в RDD, Spark создает DAG и отправляет его в планировщик DAG.
-
Планировщик DAG делит операторы на этапы задач. Этап состоит из задач, основанных на разделах входных данных. Операторы планировщика DAG вместе работают вместе. Напр. Многие операторы карты могут быть запланированы в один этап. Конечным результатом планировщика DAG является набор этапов.
-
Этапы передаются в планировщик заданий. Планировщик задач запускает задачи через диспетчер кластеров (Spark Standalone/Yarn/Mesos). Планировщик задач не знает о зависимостях этапов.
-
Рабочий выполняет задачи на ведомом.
Давайте рассмотрим, как Spark создает DAG.
На высоком уровне существуют два преобразования, которые могут быть применены к RDD, а именно узкое преобразование и широкое преобразование. Широкие преобразования в основном приводят к границам сцены.
Узкое преобразование - не требует перетасовки данных по разделам. например, Карта, фильтр и т.д.
широкое преобразование - требует, чтобы данные были перетасованы, например, reduceByKey и т.д.
Давайте рассмотрим пример подсчета количества сообщений журнала на каждом уровне серьезности,
Ниже приведен файл журнала, начинающийся с уровня серьезности,
INFO I'm Info message
WARN I'm a Warn message
INFO I'm another Info message
и создайте следующий scala код, чтобы извлечь то же самое,
val input = sc.textFile("log.txt")
val splitedLines = input.map(line => line.split(" "))
.map(words => (words(0), 1))
.reduceByKey{(a,b) => a + b}
Эта последовательность команд неявно определяет DAG объектов RDD (RDD lineage), которые будут использоваться позже, когда вызывается действие. Каждый RDD поддерживает указатель на одного или нескольких родителей вместе с метаданными о том, какой тип отношения он имеет с родителем. Например, когда мы вызываем val b = a.map()
на RDD, RDD b
сохраняет ссылку на родителя a
, что линия.
Чтобы отобразить линию RDD, Spark предоставляет метод отладки toDebugString()
. Например, выполнение toDebugString()
на splitedLines
RDD, выведет следующее:
(2) ShuffledRDD[6] at reduceByKey at <console>:25 []
+-(2) MapPartitionsRDD[5] at map at <console>:24 []
| MapPartitionsRDD[4] at map at <console>:23 []
| log.txt MapPartitionsRDD[1] at textFile at <console>:21 []
| log.txt HadoopRDD[0] at textFile at <console>:21 []
Первая строка (внизу) показывает входной RDD. Мы создали этот RDD, вызвав sc.textFile()
. Ниже приведен более схематический вид графика DAG, созданного с данного RDD.
После создания DAG планировщик Spark создает физический план выполнения. Как упоминалось выше, планировщик DAG разбивает график на несколько этапов, этапы создаются на основе преобразований. Узкие трансформации будут сгруппированы (с трубкой) вместе в один этап. Итак, для нашего примера Spark создаст двухэтапное выполнение следующим образом:
Затем планировщик DAG отправляет этапы в планировщик задач. Количество поставленных задач зависит от количества разделов, присутствующих в текстовом поле. Пример Fox рассмотрим, что в этом примере у нас есть 4 раздела, тогда будет создано 4 набора заданий, которые будут представлены параллельно, если будет достаточно ведомых/ядер. Ниже диаграмма иллюстрирует это более подробно:
Для получения более подробной информации я предлагаю вам просмотреть следующие видеоролики YouTube, где создатели Spark подробно расскажут о DAG и плане выполнения и времени жизни.