Какие факторы определяют количество исполнителей в автономном режиме?
Учитывая приложение Spark
-
Какие факторы определяют количество исполнителей в автономном режиме? В Mesos и YARN согласно этим документам мы можем указать количество исполнителей/ядер и памяти.
-
После запуска нескольких исполнителей. Запускает ли Spark задачи в циклическом режиме или достаточно умен, чтобы убедиться, что некоторые из исполнителей не работают/заняты, а затем планируют задачи соответственно.
-
Также, как Spark принимает решение о количестве задач? Я сделал написать простую программу максимальной температуры с небольшим набором данных, а Spark породила две задачи в одном исполнителе. Это находится в автономном режиме Spark.
Ответы
Ответ 1
Отвечая на ваши вопросы:
-
В автономном режиме используется одна и та же переменная конфигурации, как режимы Mesos и Yarn, чтобы установить количество исполнителей. Переменная spark.cores.max
определяет максимальное количество ядер, используемых в искровом контексте. Значение по умолчанию - бесконечность, поэтому Spark будет использовать все ядра в кластере. Переменная spark.task.cpus определяет, сколько процессоров Spark будет выделяться для одной задачи, значение по умолчанию равно 1. С помощью этих двух переменных вы можете определить максимальное количество параллельных задач в вашем кластере.
-
При создании подкласса RDD вы можете определить, в каких машинах запускается ваша задача. Это определено в методе getPreferredLocations
. Но поскольку сигнатуры метода позволяют предположить, что это только предпочтение, поэтому, если Spark обнаруживает, что одна машина не занята, она запустит задачу на этой незанятой машине. Однако я не знаю, какой механизм Spark знает, какие машины простаивают. Чтобы достичь локальности, мы (Stratio) решили сделать каждый Partions меньше, поэтому задача занимает меньше времени и достигает локальности.
-
Количество задач каждой операции Spark определяется в соответствии с длиной разделов RDD. Этот вектор является результатом метода getPartitions, который вы должны переопределить, если вы хотите создать новый подкласс класса RDD. Этот метод возвращает способ разделения RDD, где находится информация, и разделов. Когда вы присоединяетесь к двум или более RDD, используя, например, операции объединения или объединения, количество задач результирующего RDD - это максимальное количество задач RDD, участвующих в операции. Например: если вы присоединяетесь к RDD1, у которого есть 100 задач и RDD2, у которых есть 1000 задач, следующая операция результирующего RDD будет иметь 1000 задач. Обратите внимание, что большое количество разделов не обязательно является синонимом большего количества данных.
Надеюсь, это поможет.
Ответ 2
Я согласен с @jlopezmat о том, как Spark выбирает свою конфигурацию. Что касается вашего тестового кода, вы видите две задачи из-за реализации textFile
. От SparkContext.scala
:
/**
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString)
}
и если мы проверим, что такое значение defaultMinPartitions
:
/** Default min number of partitions for Hadoop RDDs when not given by user */
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
Ответ 3
Spark выбирает количество задач на основе количества разделов в исходном наборе данных. Если вы используете HDFS в качестве источника данных, то по умолчанию количество разделов, равное количеству блоков HDFS. Вы можете изменить количество разделов несколькими способами. Две верхние: как дополнительный аргумент метода SparkContext.textFile
; вызвав метод RDD.repartion
.
Ответ 4
Отвечая на некоторые вопросы, которые не были рассмотрены в предыдущих ответах:
-
в автономном режиме вам нужно сыграть с --executor-cores
и --max-executor-cores
, чтобы установить количество исполнителей, которые будут запущены (если у вас достаточно памяти для этого номера, если вы укажете --executor-memory
)
-
Spark не распределяет задачу циклически, он использует механизм, называемый Delay Scheduling", который является позволяя каждому исполнителю предлагать ему доступность мастеру, который будет решать, отправлять или не отправлять на него задачу.