Передача нескольких параметров функции pool.map() в Python
Мне нужно каким-то образом использовать функцию в pool.map(), которая принимает более одного параметра. По моему пониманию, целевая функция pool.map() может иметь только один итеративный параметр, но есть ли способ, которым я могу передать и другие параметры? В этом случае мне нужно передать несколько переменных конфигурации, например, мою Lock() и информацию о регистрации целевой функции.
Я попытался провести некоторое исследование, и я думаю, что я могу использовать частичные функции, чтобы заставить его работать? Однако я не совсем понимаю, как они работают. Любая помощь будет принята с благодарностью! Вот простой пример того, что я хочу сделать:
def target(items, lock):
for item in items:
# Do cool stuff
if (... some condition here ...):
lock.acquire()
# Write to stdout or logfile, etc.
lock.release()
def main():
iterable = [1, 2, 3, 4, 5]
pool = multiprocessing.Pool()
pool.map(target(PASS PARAMS HERE), iterable)
pool.close()
pool.join()
Ответы
Ответ 1
Вы можете использовать functools.partial
для этого (как вы подозревали):
from functools import partial
def target(lock, iterable_item):
for item in iterable_item:
# Do cool stuff
if (... some condition here ...):
lock.acquire()
# Write to stdout or logfile, etc.
lock.release()
def main():
iterable = [1, 2, 3, 4, 5]
pool = multiprocessing.Pool()
l = multiprocessing.Lock()
func = partial(target, l)
pool.map(func, iterable)
pool.close()
pool.join()
Пример:
def f(a, b, c):
print("{} {} {}".format(a, b, c))
def main():
iterable = [1, 2, 3, 4, 5]
pool = multiprocessing.Pool()
a = "hi"
b = "there"
func = partial(f, a, b)
pool.map(func, iterable)
pool.close()
pool.join()
if __name__ == "__main__":
main()
Вывод:
hi there 1
hi there 2
hi there 3
hi there 4
hi there 5
Ответ 2
Вы можете использовать функцию карты, которая допускает несколько аргументов, так же как и fork multiprocessing
в pathos
.
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>>
>>> def add_and_subtract(x,y):
... return x+y, x-y
...
>>> res = Pool().map(add_and_subtract, range(0,20,2), range(-5,5,1))
>>> res
[(-5, 5), (-2, 6), (1, 7), (4, 8), (7, 9), (10, 10), (13, 11), (16, 12), (19, 13), (22, 14)]
>>> Pool().map(add_and_subtract, *zip(*res))
[(0, -10), (4, -8), (8, -6), (12, -4), (16, -2), (20, 0), (24, 2), (28, 4), (32, 6), (36, 8)]
pathos
позволяет легко встраивать иерархические параллельные карты с несколькими входами, поэтому мы можем расширить наш пример, чтобы продемонстрировать это.
>>> from pathos.multiprocessing import ThreadingPool as TPool
>>>
>>> res = TPool().amap(add_and_subtract, *zip(*Pool().map(add_and_subtract, range(0,20,2), range(-5,5,1))))
>>> res.get()
[(0, -10), (4, -8), (8, -6), (12, -4), (16, -2), (20, 0), (24, 2), (28, 4), (32, 6), (36, 8)]
Еще больше удовольствия, заключается в создании вложенной функции, которую мы можем передать в пул.
Это возможно, потому что pathos
использует dill
, который может сериализовать почти что угодно в python.
>>> def build_fun_things(f, g):
... def do_fun_things(x, y):
... return f(x,y), g(x,y)
... return do_fun_things
...
>>> def add(x,y):
... return x+y
...
>>> def sub(x,y):
... return x-y
...
>>> neato = build_fun_things(add, sub)
>>>
>>> res = TPool().imap(neato, *zip(*Pool().map(neato, range(0,20,2), range(-5,5,1))))
>>> list(res)
[(0, -10), (4, -8), (8, -6), (12, -4), (16, -2), (20, 0), (24, 2), (28, 4), (32, 6), (36, 8)]
Если вы не можете выйти за пределы стандартной библиотеки, вам придется сделать это по-другому. Лучше всего в этом случае использовать multiprocessing.starmap
, как показано здесь: Python multiprocessing pool.map для нескольких аргументов (отмечен @Roberto в комментариях к сообщению OP )
Получить pathos
здесь: https://github.com/uqfoundation
Ответ 3
Если у вас нет доступа к functools.partial
, вы также можете использовать для этого функцию-оболочку.
def target(lock):
def wrapped_func(items):
for item in items:
# Do cool stuff
if (... some condition here ...):
lock.acquire()
# Write to stdout or logfile, etc.
lock.release()
return wrapped_func
def main():
iterable = [1, 2, 3, 4, 5]
pool = multiprocessing.Pool()
lck = multiprocessing.Lock()
pool.map(target(lck), iterable)
pool.close()
pool.join()
Это превращает target()
в функцию, которая принимает блокировку (или любые параметры, которые вы хотите дать), и она вернет функцию, которая принимает только итерабельность в качестве входных данных, но все равно может использовать все остальные параметры. Это то, что в конечном итоге передано в pool.map()
, которое затем должно выполняться без проблем.