Ответ 1
Ваша проблема в том, что объекты блокировки не могут быть выбраны. В этом случае я вижу два возможных решения.
-
Чтобы избежать этого, вы можете сделать вашу переменную блокировки глобальной переменной. Затем вы сможете ссылаться на него в своей функции процесса пула непосредственно как глобальную переменную и не должны передавать ее в качестве аргумента функции процесса пула. Это работает, потому что Python использует механизм
OS fork
при создании процессов пула и, следовательно, копирует все содержимое процесса, которое создает для них процессы пула. Это единственный способ передачи блокировки в процесс Python, созданный с помощью пакета многопроцессорности. Кстати, нет необходимости использовать классManager
только для этой блокировки. С этим изменением ваш код будет выглядеть так:import multiprocessing from functools import partial lock = None # Global definition of lock pool = None # Global definition of pool def make_network(initial_tag, max_tags=2, max_iter=3): global lock global pool lock = multiprocessing.Lock() pool = multiprocessing.Pool(8) def get_more_tags(): global lock pass # this is a very expensive function that I would like to parallelize # over a list of tags. It involves a (relatively cheap) call to an external # database, which needs a lock to avoid simultaneous queries. It takes a # list of strings (tags) as its sole argument, and returns a list of sets # with entries corresponding to the input list. f = partial(get_more_tags, max_tags=max_tags) def _recursively_find_more_tags(tags, level): global pool if level >= max_iter: raise StopIteration new_tags = pool.map(f, tags) to_search = [] for i, s in zip(tags, new_tags): for t in s: joined = ' '.join(t) print(i + "|" + joined) to_search.append(joined) try: return _recursively_find_more_tags(to_search, level + 1) except StopIteration: return None _recursively_find_more_tags([initial_tag], 0)
В вашем реальном коде возможно, что переменные блокировки и пула могут быть переменными экземпляра класса.
- Второе решение, которое вообще избегает использования блокировок, но может иметь несколько более высокие накладные расходы, заключается в создании другого процесса с помощью
multiprocessing.Process
и подключении его черезmultiprocessing.Queue
к каждому из ваших процессов пула. Этот процесс будет отвечать за запуск запроса к базе данных. Вы должны использовать очередь, чтобы позволить процессам пула отправлять параметры процессу, который управлял запросом базы данных. Поскольку все процессы пула будут использовать одну и ту же очередь, доступ к базе данных будет автоматически сериализован. Дополнительные накладные расходы будут возникать из-за травления/рассыпания аргументов запроса базы данных и ответа на запрос. Обратите внимание, что вы можете передать объектmultiprocessing.Queue
в процесс пула в качестве аргумента. Также обратите внимание, что решениеmultiprocessing.Lock
не будет работать наWindows
, где процесс не создается семантикойfork
.