Python Manager dict в многопроцессорной обработке

Вот простой многопроцессорный код:

from multiprocessing import Process, Manager

manager = Manager()
d = manager.dict()

def f():
    d[1].append(4)
    print d

if __name__ == '__main__':
    d[1] = []
    p = Process(target=f)
    p.start()
    p.join()

Выход я получаю:

{1: []}

Почему бы мне не получить {1: [4]} в качестве вывода?

Ответы

Ответ 1

Вот что вы написали:

# from here code executes in main process and all child processes
# every process makes all these imports
from multiprocessing import Process, Manager

# every process creates own 'manager' and 'd'
manager = Manager() 
# BTW, Manager is also child process, and 
# in its initialization it creates new Manager, and new Manager
# creates new and new and new
# Did you checked how many python processes were in your system? - a lot!
d = manager.dict()

def f():
    # 'd' - is that 'd', that is defined in globals in this, current process 
    d[1].append(4)
    print d

if __name__ == '__main__':
# from here code executes ONLY in main process 
    d[1] = []
    p = Process(target=f)
    p.start()
    p.join()

Вот что вы должны написать:

from multiprocessing import Process, Manager
def f(d):
    d[1] = d[1] + [4]
    print d

if __name__ == '__main__':
    manager = Manager() # create only 1 mgr
    d = manager.dict() # create only 1 dict
    d[1] = []
    p = Process(target=f,args=(d,)) # say to 'f', in which 'd' it should append
    p.start()
    p.join()

Ответ 2

Я думаю, что это ошибка в вызовах прокси-сервера менеджера. Вы можете обойти избегающие методы вызова общего списка, например:

from multiprocessing import Process, Manager

manager = Manager()
d = manager.dict()

def f():
    # get the shared list
    shared_list = d[1]

    shared_list.append(4)

    # forces the shared list to 
    # be serialized back to manager
    d[1] = shared_list

    print d

if __name__ == '__main__':
    d[1] = []
    p = Process(target=f)
    p.start()
    p.join()

    print d

Ответ 3

from multiprocessing import Process, Manager
manager = Manager()
d = manager.dict()
l=manager.list()

def f():
    l.append(4)
    d[1]=l
    print d

if __name__ == '__main__':
    d[1]=[]
    p = Process(target=f)
    p.start()
    p.join()

Ответ 4

Причина, по которой новый элемент, добавленный к d[1] не печатается, указан в официальная документация Python:

Модификации измененных значений или элементов в прокси файлах dict и list будут не распространяются через менеджера, поскольку у прокси-сервера нет способа зная, когда его значения или элементы будут изменены. Чтобы изменить такой элемент, вы можете повторно назначить измененный объект в прокси-сервер контейнера.

Поэтому на самом деле это происходит:

from multiprocessing import Process, Manager

manager = Manager()
d = manager.dict()

def f():
    # invoke d.__getitem__(), returning a local copy of the empty list assigned by the main process,
    # (consider that a KeyError exception wasn't raised, so a list was definitely returned),
    # and append 4 to it, however this change is not propagated through the manager,
    # as it performed on an ordinary list with which the manager has no interaction
    d[1].append(4)
    # convert d to string via d.__str__() (see https://docs.python.org/2/reference/datamodel.html#object.__str__),
    # returning the "remote" string representation of the object (see https://docs.python.org/2/library/multiprocessing.html#multiprocessing.managers.SyncManager.list),
    # to which the change above was not propagated
    print d

if __name__ == '__main__':
    # invoke d.__setitem__(), propagating this assignment (mapping 1 to an empty list) through the manager
    d[1] = []
    p = Process(target=f)
    p.start()
    p.join()

Переназначение d[1] с новым списком или даже с тем же списком еще раз, после его обновления, запускает менеджер для распространения изменения:

from multiprocessing import Process, Manager

manager = Manager()
d = manager.dict()

def f():
    # perform the exact same steps, as explained in the comments to the previous code snippet above,
    # but in addition, invoke d.__setitem__() with the changed item in order to propagate the change
    l = d[1]
    l.append(4)
    d[1] = l
    print d

if __name__ == '__main__':
    d[1] = []
    p = Process(target=f)
    p.start()
    p.join()

Линия d[1] += [4] также работала бы.


В качестве альтернативы Начиная с Python 3.6, за этот набор изменений после этот вопрос, также возможно использовать вложенные прокси-объекты, которые автоматически распространяют любые изменения, выполненные на них, на содержащий прокси-объект. Таким образом, замена строки d[1] = [] на d[1] = manager.list() также исправит проблему:

from multiprocessing import Process, Manager

manager = Manager()
d = manager.dict()

def f():
    d[1].append(4)
    # the __str__() method of a dict object invokes __repr__() on each of its items,
    # so explicitly invoking __str__() is required in order to print the actual list items
    print({k: str(v) for k, v in d.items()}

if __name__ == '__main__':
    d[1] = manager.list()
    p = Process(target=f)
    p.start()
    p.join()

К сожалению, это исправление ошибки не было перенесено на Python 2.7 (как на Python 2.7.13).


ПРИМЕЧАНИЕ (выполняется под операционной системой Windows):

Несмотря на то, что описанное поведение относится и к операционной системе Windows, прикрепленные фрагменты кода не будут выполняться в Windows из-за механизма создания нового процесса, который опирается на CreateProcess() API, а не на системный вызов fork(), который не поддерживается.

Всякий раз, когда новый процесс создается через модуль многопроцессорности, Windows создает новый процесс интерпретатора Python, который импортирует основной модуль с потенциально опасными побочными эффектами. Чтобы обойти эту проблему, следующее руководство по программированию рекомендуется:

Убедитесь, что основной модуль можно безопасно импортировать с помощью нового интерпретатора Python, не вызывая непреднамеренных побочных эффектов (например, запуск нового процесса).

Следовательно, выполнение прикрепленных фрагментов кода, находящихся под Windows, будет пытаться создать бесконечное количество процессов из-за строки manager = Manager(). Это можно легко устранить, создав объекты Manager и Manager.dict внутри предложения if __name__ == '__main__' и передав объект Manager.dict в качестве аргумента f(), как это сделано в этот ответ.

Подробнее об этом можно узнать в этом ответе.