Как использовать пользовательские классы с Apache Spark (pyspark)?

Я написал класс, реализующий классификатор в python. Я хотел бы использовать Apache Spark для параллелизации классификации огромного количества точек данных, использующих этот классификатор.

  • Я настроен с использованием Amazon EC2 на кластере с 10 подчиненными устройствами, основанными на ami, который поставляется с дистрибутивом Anaconda на Python. Ami позволяет мне использовать IPython Notebook удаленно.
  • Я определил класс BoTree в файле BoTree.py в главном файле в папке /root/anaconda/lib/python 2.7/, где все мои модули python
  • Я проверил, что могу импортировать и использовать BoTree.py при запуске командной строки от мастера (мне просто нужно начать с написания импорта BoTree, и мой класс BoTree станет доступен
  • Я использовал spark/root/spark-ec2/copy-dir.sh script, чтобы скопировать каталог/python2.7/в моем кластере.
  • Я отправил ssh-ed в один из подчиненных устройств и попробовал запустить ipython, и смог импортировать BoTree, поэтому я думаю, что модуль был успешно отправлен по кластеру (я также могу увидеть файл BoTree.py в папка... /python 2.7/)
  • На хозяине, который я проверил, я могу разборки и разборки экземпляра BoTree с помощью cPickle, который, как я понимаю, является сериализатором pyspark.

Однако, когда я делаю следующее:

import BoTree
bo_tree = BoTree.train(data)
rdd = sc.parallelize(keyed_training_points) #create rdd of 10 (integer, (float, float) tuples
rdd = rdd.mapValues(lambda point, bt = bo_tree: bt.classify(point[0], point[1]))
out = rdd.collect()

Искра не с ошибкой (только соответствующий бит, я думаю):

  File "/root/spark/python/pyspark/worker.py", line 90, in main
    command = pickleSer.loads(command.value)
  File "/root/spark/python/pyspark/serializers.py", line 405, in loads
    return cPickle.loads(obj)
ImportError: No module named BoroughTree

Может ли кто-нибудь мне помочь? Отчасти отчаянно...

Спасибо

Ответы

Ответ 1

Возможно, самым простым решением является использование аргумента pyFiles при создании SparkContext

from pyspark import SparkContext
sc = SparkContext(master, app_name, pyFiles=['/path/to/BoTree.py'])

Каждый файл, размещенный там, будет отправлен рабочим и добавлен в PYTHONPATH.

Если вы работаете в интерактивном режиме, вы должны остановить существующий контекст, используя sc.stop(), прежде чем создавать новый.

Также убедитесь, что рабочий Spark фактически использует дистрибутив Anaconda, а не интерпретатор Python по умолчанию. Основываясь на вашем описании, это, скорее всего, проблема. Чтобы установить PYSPARK_PYTHON, вы можете использовать conf/spark-env.sh файлы.

На стороне примечание копирование файла на lib является довольно грязным решением. Если вы хотите избежать нажатия файлов с помощью pyFiles, я бы рекомендовал создать простой пакет Python или пакет Conda и правильную установку. Таким образом, вы можете легко отслеживать, что установлено, удалять ненужные пакеты и избегать проблем с отладки.

Ответ 2

После получения SparkContext можно также использовать addPyFile для последующего отправки модуля каждому работнику.

sc.addPyFile('/path/to/BoTree.py')

документа pyspark.SparkContext.addPyFile(путь)