Невозможно рассортировать <тип 'instancemethod'> при использовании многопроцессорности Pool.map()
Я пытаюсь использовать функцию multiprocessing
Pool.map()
для разделения работы одновременно. Когда я использую следующий код, он отлично работает:
import multiprocessing
def f(x):
return x*x
def go():
pool = multiprocessing.Pool(processes=4)
print pool.map(f, range(10))
if __name__== '__main__' :
go()
Однако, когда я использую его в более объектно-ориентированном подходе, он не работает. Это сообщение об ошибке:
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
__builtin__.instancemethod failed
Это происходит, когда следующая моя основная программа:
import someClass
if __name__== '__main__' :
sc = someClass.someClass()
sc.go()
и следующий класс someClass
:
import multiprocessing
class someClass(object):
def __init__(self):
pass
def f(self, x):
return x*x
def go(self):
pool = multiprocessing.Pool(processes=4)
print pool.map(self.f, range(10))
Кто-нибудь знает, в чем проблема, или простой способ обойти его?
Ответы
Ответ 1
Проблема заключается в том, что многопроцессорность должна рассортировать вещи, чтобы перевязать их между процессами, а связанные методы не разборчивы. Обходной путь (независимо от того, считаете ли вы его "легким" или нет;-) заключается в том, чтобы добавить инфраструктуру в вашу программу, чтобы позволить таким методам мариноваться, регистрируя ее с помощью copy_reg.
Например, вклад Стивена Бетарда в этот поток (к концу потока) показывает один вполне работоспособный подход, позволяющий методу травления/разбрасывания через copy_reg
.
Ответ 2
Все эти решения уродливы, потому что многопроцессорность и травление прерываются и ограничены, если вы не прыгаете за пределы стандартной библиотеки.
Если вы используете вилку multiprocessing
под названием pathos.multiprocesssing
, вы можете напрямую использовать классы и методы класса в многопроцессорных функциях map
. Это связано с тем, что вместо pickle
или cPickle
используется dill
, а dill
может сериализовать почти что угодно в python.
pathos.multiprocessing
также предоставляет функцию асинхронной карты... и она может map
функционировать с несколькими аргументами (например, map(math.pow, [1,2,3], [4,5,6])
)
См:
Что может сделать многопроцессорность и укроп?
и:
http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/
>>> import pathos.pools as pp
>>> p = pp.ProcessPool(4)
>>>
>>> def add(x,y):
... return x+y
...
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>>
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>>
>>> class Test(object):
... def plus(self, x, y):
... return x+y
...
>>> t = Test()
>>>
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>>
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
И только для того, чтобы быть явным, вы можете сделать именно то, что хотите сделать в первую очередь, и можете сделать это из интерпретатора, если хотите.
>>> import pathos.pools as pp
>>> class someClass(object):
... def __init__(self):
... pass
... def f(self, x):
... return x*x
... def go(self):
... pool = pp.ProcessPool(4)
... print pool.map(self.f, range(10))
...
>>> sc = someClass()
>>> sc.go()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>>
Получить код здесь:
https://github.com/uqfoundation/pathos
Ответ 3
Вы также можете определить метод __call__()
внутри вашего someClass()
, который вызывает someClass.go()
, а затем передать экземпляр someClass()
в пул. Этот объект разборчиво, и он отлично работает (для меня)...
Ответ 4
Некоторые ограничения, хотя решение Стивена Бетарда:
Когда вы регистрируете свой метод класса как функцию, деструктор вашего класса вызывает удивление каждый раз, когда обработка вашего метода завершена. Поэтому, если у вас есть один экземпляр вашего класса, который называет n раз его метод, члены могут исчезать между двумя прогонами, и вы можете получить сообщение malloc: *** error for object 0x...: pointer being freed was not allocated
(например, открытый файл-член) или pure virtual method called,
terminate called without an active exception
(что означает срок жизни члена объект, который я использовал, был короче, чем я думал). Я получил это, когда занимался n большим, чем размер пула. Вот краткий пример:
from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult
# --------- see Stenven solution above -------------
from copy_reg import pickle
from types import MethodType
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
class Myclass(object):
def __init__(self, nobj, workers=cpu_count()):
print "Constructor ..."
# multi-processing
pool = Pool(processes=workers)
async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ]
pool.close()
# waiting for all results
map(ApplyResult.wait, async_results)
lst_results=[r.get() for r in async_results]
print lst_results
def __del__(self):
print "... Destructor"
def process_obj(self, index):
print "object %d" % index
return "results"
pickle(MethodType, _pickle_method, _unpickle_method)
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once)
Вывод:
Constructor ...
object 0
object 1
object 2
... Destructor
object 3
... Destructor
object 4
... Destructor
object 5
... Destructor
object 6
... Destructor
object 7
... Destructor
... Destructor
... Destructor
['results', 'results', 'results', 'results', 'results', 'results', 'results', 'results']
... Destructor
Метод __call__
не так эквивалентен, потому что [None,...] считываются из результатов:
from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult
class Myclass(object):
def __init__(self, nobj, workers=cpu_count()):
print "Constructor ..."
# multiprocessing
pool = Pool(processes=workers)
async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ]
pool.close()
# waiting for all results
map(ApplyResult.wait, async_results)
lst_results=[r.get() for r in async_results]
print lst_results
def __call__(self, i):
self.process_obj(i)
def __del__(self):
print "... Destructor"
def process_obj(self, i):
print "obj %d" % i
return "result"
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once),
# **and** results are empty !
Таким образом, ни один из обоих методов не удовлетворяет...
Ответ 5
Там может быть и другой короткий фрагмент, хотя он может быть неэффективным в зависимости от того, что в ваших экземплярах класса.
Как все говорили, проблема в том, что код multiprocessing
должен рассортировать вещи, которые он отправляет, в подпроцессы, которые он запускал, а pickler не выполняет экземпляры-методы.
Однако вместо отправки экземпляра-метода вы можете отправить фактический экземпляр класса, а также имя вызываемой функции, в обычную функцию, которая затем использует getattr
для вызова метода экземпляра, создавая тем самым связанный метод в подпроцессе Pool
. Это похоже на определение метода __call__
, за исключением того, что вы можете вызывать более одной функции-члена.
Кража @EricH. код из его ответа и немного аннотируя его (я перепечатал его, следовательно, все изменения имени и т.д. по какой-то причине это показалось проще, чем вырезать-вставить:-)) для иллюстрации всей магии:
import multiprocessing
import os
def call_it(instance, name, args=(), kwargs=None):
"indirect caller for instance methods and multiprocessing"
if kwargs is None:
kwargs = {}
return getattr(instance, name)(*args, **kwargs)
class Klass(object):
def __init__(self, nobj, workers=multiprocessing.cpu_count()):
print "Constructor (in pid=%d)..." % os.getpid()
self.count = 1
pool = multiprocessing.Pool(processes = workers)
async_results = [pool.apply_async(call_it,
args = (self, 'process_obj', (i,))) for i in range(nobj)]
pool.close()
map(multiprocessing.pool.ApplyResult.wait, async_results)
lst_results = [r.get() for r in async_results]
print lst_results
def __del__(self):
self.count -= 1
print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count)
def process_obj(self, index):
print "object %d" % index
return "results"
Klass(nobj=8, workers=3)
Результат показывает, что конструктор, как правило, вызывается один раз (в исходном pid), а деструктор называется 9 раз (один раз для каждой копии, сделанной = 2 или 3 раза для каждого рабочего процесса пула, плюс один раз в исходном процессе). Это часто бывает нормально, так как в этом случае, поскольку сортировщик по умолчанию делает копию всего экземпляра и (semi-) тайно повторно заполняет его - в этом случае, делая:
obj = object.__new__(Klass)
obj.__dict__.update({'count':1})
- поэтому, хотя деструктор называется восемь раз в трех рабочих процессах, он отсчитывает от 1 до 0 каждый раз, но, конечно, вы все равно можете попасть в проблему таким образом. При необходимости вы можете предоставить свой собственный __setstate__
:
def __setstate__(self, adict):
self.count = adict['count']
в этом случае, например.
Ответ 6
Вы также можете определить метод __call__()
внутри вашего someClass()
, который вызывает someClass.go()
, а затем передать экземпляр someClass()
в пул. Этот объект разборчиво, и он отлично работает (для меня)...
class someClass(object):
def __init__(self):
pass
def f(self, x):
return x*x
def go(self):
p = Pool(4)
sc = p.map(self, range(4))
print sc
def __call__(self, x):
return self.f(x)
sc = someClass()
sc.go()
Ответ 7
Решение из parisjohn выше отлично работает со мной. Плюс код выглядит чистым и простым для понимания. В моем случае есть несколько функций для вызова с использованием Pool, поэтому я изменил код parisjohn чуть ниже. Я сделал вызов, чтобы иметь возможность вызывать несколько функций, а имена функций передаются в аргументе dict из go()
:
from multiprocessing import Pool
class someClass(object):
def __init__(self):
pass
def f(self, x):
return x*x
def g(self, x):
return x*x+1
def go(self):
p = Pool(4)
sc = p.map(self, [{"func": "f", "v": 1}, {"func": "g", "v": 2}])
print sc
def __call__(self, x):
if x["func"]=="f":
return self.f(x["v"])
if x["func"]=="g":
return self.g(x["v"])
sc = someClass()
sc.go()
Ответ 8
Потенциально тривиальное решение этого - переключиться на использование multiprocessing.dummy
. Это потоковая реализация интерфейса многопроцессорности, который, похоже, не имеет этой проблемы в Python 2.7. У меня нет большого опыта здесь, но это быстрое изменение импорта позволило мне вызвать apply_async для метода класса.
Несколько хороших ресурсов на multiprocessing.dummy
:
https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.dummy
http://chriskiehl.com/article/parallelism-in-one-line/
Ответ 9
В этом простом случае, когда someClass.f
не наследует никаких данных из класса и ничего не присоединяет к классу, возможное решение состоит в том, чтобы отделить f
, чтобы его можно было засолить:
import multiprocessing
def f(x):
return x*x
class someClass(object):
def __init__(self):
pass
def go(self):
pool = multiprocessing.Pool(processes=4)
print pool.map(f, range(10))
Ответ 10
Почему бы не использовать отдельную функцию?
def func(*args, **kwargs):
return inst.method(args, kwargs)
print pool.map(func, arr)
Ответ 11
Я столкнулся с этой же проблемой, но обнаружил, что есть кодировщик JSON, который можно использовать для перемещения этих объектов между процессами.
from pyVmomi.VmomiSupport import VmomiJSONEncoder
Используйте это, чтобы создать свой список:
jsonSerialized= json.dumps(pfVmomiObj, cls=VmomiJSONEncoder)
Затем в отображенной функции используйте это для восстановления объекта:
pfVmomiObj = json.loads(jsonSerialized)
Ответ 12
Обновление: по состоянию на день написания, namedTuples можно выбирать (начиная с python 2.7)
Проблема здесь в том, что дочерние процессы не могут импортировать класс объекта -in, в этом случае, класс P-, в случае многомодельного проекта класс P должен импортироваться везде, где получен дочерний процесс. б
быстрый обходной путь - сделать его импортируемым, воздействуя на глобальные переменные()
globals()["P"] = P