Python Manager dict в многопроцессорной обработке
Вот простой многопроцессорный код:
from multiprocessing import Process, Manager
manager = Manager()
d = manager.dict()
def f():
d[1].append(4)
print d
if __name__ == '__main__':
d[1] = []
p = Process(target=f)
p.start()
p.join()
Выход я получаю:
{1: []}
Почему бы мне не получить {1: [4]}
в качестве вывода?
Ответы
Ответ 1
Вот что вы написали:
# from here code executes in main process and all child processes
# every process makes all these imports
from multiprocessing import Process, Manager
# every process creates own 'manager' and 'd'
manager = Manager()
# BTW, Manager is also child process, and
# in its initialization it creates new Manager, and new Manager
# creates new and new and new
# Did you checked how many python processes were in your system? - a lot!
d = manager.dict()
def f():
# 'd' - is that 'd', that is defined in globals in this, current process
d[1].append(4)
print d
if __name__ == '__main__':
# from here code executes ONLY in main process
d[1] = []
p = Process(target=f)
p.start()
p.join()
Вот что вы должны написать:
from multiprocessing import Process, Manager
def f(d):
d[1] = d[1] + [4]
print d
if __name__ == '__main__':
manager = Manager() # create only 1 mgr
d = manager.dict() # create only 1 dict
d[1] = []
p = Process(target=f,args=(d,)) # say to 'f', in which 'd' it should append
p.start()
p.join()
Ответ 2
Я думаю, что это ошибка в вызовах прокси-сервера менеджера. Вы можете обойти избегающие методы вызова общего списка, например:
from multiprocessing import Process, Manager
manager = Manager()
d = manager.dict()
def f():
# get the shared list
shared_list = d[1]
shared_list.append(4)
# forces the shared list to
# be serialized back to manager
d[1] = shared_list
print d
if __name__ == '__main__':
d[1] = []
p = Process(target=f)
p.start()
p.join()
print d
Ответ 3
from multiprocessing import Process, Manager
manager = Manager()
d = manager.dict()
l=manager.list()
def f():
l.append(4)
d[1]=l
print d
if __name__ == '__main__':
d[1]=[]
p = Process(target=f)
p.start()
p.join()
Ответ 4
Причина, по которой новый элемент, добавленный к d[1]
не печатается, указан в официальная документация Python:
Модификации измененных значений или элементов в прокси файлах dict и list будут не распространяются через менеджера, поскольку у прокси-сервера нет способа зная, когда его значения или элементы будут изменены. Чтобы изменить такой элемент, вы можете повторно назначить измененный объект в прокси-сервер контейнера.
Поэтому на самом деле это происходит:
from multiprocessing import Process, Manager
manager = Manager()
d = manager.dict()
def f():
# invoke d.__getitem__(), returning a local copy of the empty list assigned by the main process,
# (consider that a KeyError exception wasn't raised, so a list was definitely returned),
# and append 4 to it, however this change is not propagated through the manager,
# as it performed on an ordinary list with which the manager has no interaction
d[1].append(4)
# convert d to string via d.__str__() (see https://docs.python.org/2/reference/datamodel.html#object.__str__),
# returning the "remote" string representation of the object (see https://docs.python.org/2/library/multiprocessing.html#multiprocessing.managers.SyncManager.list),
# to which the change above was not propagated
print d
if __name__ == '__main__':
# invoke d.__setitem__(), propagating this assignment (mapping 1 to an empty list) through the manager
d[1] = []
p = Process(target=f)
p.start()
p.join()
Переназначение d[1]
с новым списком или даже с тем же списком еще раз, после его обновления, запускает менеджер для распространения изменения:
from multiprocessing import Process, Manager
manager = Manager()
d = manager.dict()
def f():
# perform the exact same steps, as explained in the comments to the previous code snippet above,
# but in addition, invoke d.__setitem__() with the changed item in order to propagate the change
l = d[1]
l.append(4)
d[1] = l
print d
if __name__ == '__main__':
d[1] = []
p = Process(target=f)
p.start()
p.join()
Линия d[1] += [4]
также работала бы.
В качестве альтернативы Начиная с Python 3.6, за этот набор изменений после этот вопрос, также возможно использовать вложенные прокси-объекты, которые автоматически распространяют любые изменения, выполненные на них, на содержащий прокси-объект. Таким образом, замена строки d[1] = []
на d[1] = manager.list()
также исправит проблему:
from multiprocessing import Process, Manager
manager = Manager()
d = manager.dict()
def f():
d[1].append(4)
# the __str__() method of a dict object invokes __repr__() on each of its items,
# so explicitly invoking __str__() is required in order to print the actual list items
print({k: str(v) for k, v in d.items()}
if __name__ == '__main__':
d[1] = manager.list()
p = Process(target=f)
p.start()
p.join()
К сожалению, это исправление ошибки не было перенесено на Python 2.7 (как на Python 2.7.13).
ПРИМЕЧАНИЕ (выполняется под операционной системой Windows):
Несмотря на то, что описанное поведение относится и к операционной системе Windows, прикрепленные фрагменты кода не будут выполняться в Windows из-за механизма создания нового процесса, который опирается на CreateProcess()
API, а не на системный вызов fork()
, который не поддерживается.
Всякий раз, когда новый процесс создается через модуль многопроцессорности, Windows создает новый процесс интерпретатора Python, который импортирует основной модуль с потенциально опасными побочными эффектами. Чтобы обойти эту проблему, следующее руководство по программированию рекомендуется:
Убедитесь, что основной модуль можно безопасно импортировать с помощью нового интерпретатора Python, не вызывая непреднамеренных побочных эффектов (например, запуск нового процесса).
Следовательно, выполнение прикрепленных фрагментов кода, находящихся под Windows, будет пытаться создать бесконечное количество процессов из-за строки manager = Manager()
. Это можно легко устранить, создав объекты Manager
и Manager.dict
внутри предложения if __name__ == '__main__'
и передав объект Manager.dict
в качестве аргумента f()
, как это сделано в этот ответ.
Подробнее об этом можно узнать в этом ответе.