Как использовать пользовательские классы с Apache Spark (pyspark)?
Я написал класс, реализующий классификатор в python. Я хотел бы использовать Apache Spark для параллелизации классификации огромного количества точек данных, использующих этот классификатор.
- Я настроен с использованием Amazon EC2 на кластере с 10 подчиненными устройствами, основанными на ami, который поставляется с дистрибутивом Anaconda на Python. Ami позволяет мне использовать IPython Notebook удаленно.
- Я определил класс BoTree в файле BoTree.py в главном файле в папке /root/anaconda/lib/python 2.7/, где все мои модули python
- Я проверил, что могу импортировать и использовать BoTree.py при запуске командной строки от мастера (мне просто нужно начать с написания импорта BoTree, и мой класс BoTree станет доступен
- Я использовал spark/root/spark-ec2/copy-dir.sh script, чтобы скопировать каталог/python2.7/в моем кластере.
- Я отправил ssh-ed в один из подчиненных устройств и попробовал запустить ipython, и смог импортировать BoTree, поэтому я думаю, что модуль был успешно отправлен по кластеру (я также могу увидеть файл BoTree.py в папка... /python 2.7/)
- На хозяине, который я проверил, я могу разборки и разборки экземпляра BoTree с помощью cPickle, который, как я понимаю, является сериализатором pyspark.
Однако, когда я делаю следующее:
import BoTree
bo_tree = BoTree.train(data)
rdd = sc.parallelize(keyed_training_points) #create rdd of 10 (integer, (float, float) tuples
rdd = rdd.mapValues(lambda point, bt = bo_tree: bt.classify(point[0], point[1]))
out = rdd.collect()
Искра не с ошибкой (только соответствующий бит, я думаю):
File "/root/spark/python/pyspark/worker.py", line 90, in main
command = pickleSer.loads(command.value)
File "/root/spark/python/pyspark/serializers.py", line 405, in loads
return cPickle.loads(obj)
ImportError: No module named BoroughTree
Может ли кто-нибудь мне помочь? Отчасти отчаянно...
Спасибо
Ответы
Ответ 1
Возможно, самым простым решением является использование аргумента pyFiles
при создании SparkContext
from pyspark import SparkContext
sc = SparkContext(master, app_name, pyFiles=['/path/to/BoTree.py'])
Каждый файл, размещенный там, будет отправлен рабочим и добавлен в PYTHONPATH
.
Если вы работаете в интерактивном режиме, вы должны остановить существующий контекст, используя sc.stop()
, прежде чем создавать новый.
Также убедитесь, что рабочий Spark фактически использует дистрибутив Anaconda, а не интерпретатор Python по умолчанию. Основываясь на вашем описании, это, скорее всего, проблема. Чтобы установить PYSPARK_PYTHON
, вы можете использовать conf/spark-env.sh
файлы.
На стороне примечание копирование файла на lib
является довольно грязным решением. Если вы хотите избежать нажатия файлов с помощью pyFiles
, я бы рекомендовал создать простой пакет Python или пакет Conda и правильную установку. Таким образом, вы можете легко отслеживать, что установлено, удалять ненужные пакеты и избегать проблем с отладки.
Ответ 2
После получения SparkContext можно также использовать addPyFile
для последующего отправки модуля каждому работнику.
sc.addPyFile('/path/to/BoTree.py')
документа pyspark.SparkContext.addPyFile(путь)