Spark использует только один рабочий компьютер, когда доступно больше
Я пытаюсь распараллелить задачу прогнозирования машинного обучения через Spark. Я использовал Spark несколько раз ранее в других задачах и раньше не сталкивался с проблемой распараллеливания.
В этой конкретной задаче мой кластер имеет 4 рабочих. Я вызываю mapPartitions на RDD с 4 разделами. Функция map загружает модель с диска (bootstrap script распределяет все, что необходимо для этого, я проверял ее на каждом подчиненном компьютере) и выполняет прогнозирование точек данных в разделе RDD.
Код запускается, но используется только один исполнитель. Журналы для других исполнителей говорят "Выключенный крюк называется". При разных запусках кода он использует разные машины, но только по одному.
Как я могу заставить Spark использовать сразу несколько машин?
Я использую PySpark на Amazon EMR через ноутбук Zeppelin. Ниже приведены фрагменты кода.
%spark.pyspark
sc.addPyFile("/home/hadoop/MyClassifier.py")
sc.addPyFile("/home/hadoop/ModelLoader.py")
from ModelLoader import ModelLoader
from MyClassifier import MyClassifier
def load_models():
models_path = '/home/hadoop/models'
model_loader = ModelLoader(models_path)
models = model_loader.load_models()
return models
def process_file(file_contents, models):
filename = file_contents[0]
filetext = file_contents[1]
pred = MyClassifier.predict(filetext, models)
return (filename, pred)
def process_partition(file_list):
models = load_models()
for file_contents in file_list:
pred = process_file(file_contents, models)
yield pred
all_contents = sc.wholeTextFiles("s3://some-path", 4)
processed_pages = all_contents.mapPartitions(process_partition)
processedDF = processed_pages.toDF(["filename", "pred"])
processedDF.write.json("s3://some-other-path", mode='overwrite')
Есть четыре задачи, как ожидалось, но все они работают на одном и том же исполнителе!
Я запускаю кластер и могу предоставить журналы, доступные в диспетчере ресурсов. Я еще не знаю, где искать.
Ответы
Ответ 1
Процесс имеет столько же разделов, сколько вы указали, но он идет сериализованным способом.
Исполнители
Процесс может увеличивать число исполнителей по умолчанию. Это можно увидеть в менеджере ресурсов пряжи. В вашем случае вся обработка выполняется одним исполнителем. Если исполнитель имеет более одного ядра, он будет parellize задания. В emr вы делаете это, чтобы иметь более 1 ядро для исполнителя.
Что конкретно происходит в нашем случае, данные малы, поэтому все данные считываются одним исполнителем (т.е. который использует один node). При отсутствии следующего свойства исполнитель использует только одно ядро. Следовательно, все задачи сериализуются.
Установка свойства
sudo vi /etc/hadoop/conf/capacity-scheduler.xml
Установка следующего свойства, как показано на рисунке
"yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"
Чтобы применить это свойство, вам необходимо перезапустить пряжу
sudo hadoop-yarn-resourcemanager stop
Перезапустить пряжу
sudo hadoop-yarn-resourcemanager start
Когда ваша работа подана, посмотрите на пряжу и искру-ui
В пряже вы увидите больше ядер для исполнителя
Ответ 2
Два момента, чтобы упомянуть здесь (не уверены, что они разрешат вашу проблему):
-
wholeTextFiles
использует WholeTextFileInputFormat
, который расширяет CombineFileInputFormat
, а из-за CombineFileInputFormat
он попытается объединить группы небольших файлов в один раздел. Поэтому, если вы зададите число разделов на 2, например, вы можете получить два раздела, но это не гарантируется, это зависит от размера файлов, которые вы читаете.
- Вывод
wholeTextFiles
- это RDD, который содержит весь файл в каждой записи (и каждая запись/файл не может быть разделена так, что он заканчивается тем, что находится в одном разделе/рабочем). Поэтому, если вы читаете только один файл, вы закончите с полным файлом в одном разделе, несмотря на то, что вы установили в ваш пример раздел 4.