Получение числа видимых узлов в PySpark

Я выполняю некоторые операции в PySpark и недавно увеличил количество узлов в моей конфигурации (которая находится на Amazon EMR). Однако, хотя я увеличил число узлов (с 4 до 12) в три раза, производительность, похоже, не изменилась. Таким образом, я хотел бы видеть, видны ли новые узлы Spark.

Я вызываю следующую функцию:

sc.defaultParallelism
>>>> 2

Но я думаю, что это говорит мне общее количество задач, распределенных для каждого node, а не общее количество кодов, которые может видеть Spark.

Как мне узнать количество узлов, которые PySpark использует в моем кластере?

Ответы

Ответ 1

sc.defaultParallelism - это всего лишь намек. В зависимости от конфигурации он может не иметь отношения к числу узлов. Это количество разделов, если вы используете операцию, которая принимает аргумент подсчета разделов, но вы не предоставляете его. Например, sc.parallelize сделает новый RDD из списка. Вы можете сказать, сколько разделов необходимо создать в RDD со вторым аргументом. Но значением по умолчанию для этого аргумента является sc.defaultParallelism.

Вы можете получить количество исполнителей с sc.getExecutorMemoryStatus в API Scala, но это не отображается в API Python.

В целом рекомендация состоит в том, чтобы иметь в 4 раза больше разделов в RDD, так как у вас есть исполнители. Это хороший совет, потому что, если есть разница в том, сколько времени на выполнение этих задач будет даже оно. Некоторые исполнители будут обрабатывать 5 более быстрых задач, в то время как другие обрабатывают 3 более медленные задачи, например.

Вам не нужно быть очень точным с этим. Если у вас есть приблизительная идея, вы можете пойти с оценкой. Например, если вы знаете, что у вас менее 200 процессоров, вы можете сказать, что 500 разделов будут в порядке.

Итак, попробуйте создать RDD с таким количеством разделов:

rdd = sc.parallelize(data, 500)     # If distributing local data.
rdd = sc.textFile('file.csv', 500)  # If loading data from a file.

Или переразделите RDD перед вычислением, если вы не контролируете создание RDD:

rdd = rdd.repartition(500)

Вы можете проверить количество разделов в RDD с помощью rdd.getNumPartitions().

Ответ 2

В pyspark вы все равно можете вызвать API scala getExecutorMemoryStatus, используя мост pyspark py4j:

sc._jsc.sc().getExecutorMemoryStatus().size()

Ответ 3

Я обнаружил, что иногда мои сеансы были убиты удаленным, давая странную ошибку Java

Py4JJavaError: An error occurred while calling o349.defaultMinPartitions.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.

Я избегал этого следующим

def check_alive(spark_conn):
    """Check if connection is alive. ``True`` if alive, ``False`` if not"""
    try:
        get_java_obj = spark_conn._jsc.sc().getExecutorMemoryStatus()
        return True
    except Exception:
        return False

def get_number_of_executors(spark_conn):
    if not check_alive(spark_conn):
        raise Exception('Unexpected Error: Spark Session has been killed')
    try:
        return spark_conn._jsc.sc().getExecutorMemoryStatus().size()
    except:
        raise Exception('Unknown error')

Ответ 4

Другие ответы предоставляют способ получить количество исполнителей. Вот способ получить количество узлов. Сюда входят головные и рабочие узлы.

s = sc._jsc.sc().getExecutorMemoryStatus().keys()
l = str(s).replace("Set(","").replace(")","").split(", ")

d = set()
for i in l:
    d.add(i.split(":")[0])
len(d)  

Ответ 5

Должно быть возможно получить количество узлов в кластере, используя это (аналогично методу @Dan выше, но короче и работает лучше!).

sc._jsc.sc().getExecutorMemoryStatus().keySet().size()