Параллельный код не работает, когда функция распараллеливания находится в другом файле

У меня есть простая функция, которую я хочу запустить параллельно. Если функция указана напрямую в основной функции, все работает хорошо. Но если та же самая функция вызывается из отдельного файла Python (который создан, чтобы содержать серию вспомогательных функций), код завершается с ошибкой:

Задание не удалось удалить из сериализации. Пожалуйста, убедитесь, что все аргументы функции доступны для выбора.

Я пытался запустить этот код:

from joblib import Parallel, delayed
import multiprocessing
import otherFile as of

inputs = range(10) 
def processInput(i):
    return i * i

num_cores = multiprocessing.cpu_count()

results1 = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in inputs) # this works
results2 = Parallel(n_jobs=num_cores)(delayed(of.processInput)(i) for i in inputs) # this fails

Когда я вызываю функцию processInput() из файла, я просто копирую ту же функцию в этот файл .py.

def processInput(i):
    return i * i

Как я могу заставить распараллеливание работать, если функция, которую мне нужно вызвать, находится в отдельном файле .py?

Это полная ошибка:

results = Parallel(n_jobs=num_cores)(delayed(of.processInput)(i) for i in inputs)
Traceback (most recent call last):

  File "<ipython-input-387-d8dd1dc361a6>", line 1, in <module>
    results = Parallel(n_jobs=num_cores)(delayed(of.processInput)(i) for i in inputs)

  File "C:\Users\xxxxx\AppData\Local\Continuum\anaconda3\lib\site-packages\joblib\parallel.py", line 934, in __call__
    self.retrieve()

  File "C:\Users\xxxxx\AppData\Local\Continuum\anaconda3\lib\site-packages\joblib\parallel.py", line 833, in retrieve
    self._output.extend(job.get(timeout=self.timeout))

  File "C:\Users\xxxxx\AppData\Local\Continuum\anaconda3\lib\site-packages\joblib\_parallel_backends.py", line 521, in wrap_future_result
    return future.result(timeout=timeout)

  File "C:\Users\xxxxx\AppData\Local\Continuum\anaconda3\lib\concurrent\futures\_base.py", line 432, in result
    return self.__get_result()

  File "C:\Users\xxxxx\AppData\Local\Continuum\anaconda3\lib\concurrent\futures\_base.py", line 384, in __get_result
    raise self._exception

BrokenProcessPool: A task has failed to un-serialize. Please ensure that the arguments of the function are all picklable.*

Ответы

Ответ 1

просто импортируйте функцию как это

from otherFile import processinput

Ответ 2

Не уверены, что вы проверили, работает ли импортированная функция of.processInput без многопроцессорной обработки? Если это не работает, то это может быть слон в комнате, на который другие не указали. Может быть, вы скучаете

__init__.py

или, может быть, это потому, что каталог не просматривается командой Python import. Чтобы добавить каталог, вы можете сделать:

import sys; sys.path.append("path/to/otherFile/")

Хотя я не уверен, что полученное вами сообщение об ошибке хоть как-то связано с этой проблемой.