Многопроцессорный пул Python pool.map для нескольких аргументов
В библиотеке многопроцессорности Python существует ли вариант pool.map, который поддерживает несколько аргументов?
text = "test"
def harvester(text, case):
X = case[0]
text+ str(X)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=6)
case = RAW_DATASET
pool.map(harvester(text,case),case, 1)
pool.close()
pool.join()
Ответы
Ответ 1
Ответ на этот вопрос зависит от версии и ситуации. Самый общий ответ для последних версий Python (начиная с 3.3) был впервые описан ниже J.F. Себастьян. 1 Он использует метод Pool.starmap
, который принимает последовательность кортежей аргументов. Затем он автоматически распаковывает аргументы из каждого кортежа и передает их в заданную функцию:
import multiprocessing
from itertools import product
def merge_names(a, b):
return '{} & {}'.format(a, b)
if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with multiprocessing.Pool(processes=3) as pool:
results = pool.starmap(merge_names, product(names, repeat=2))
print(results)
# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...
Для более ранних версий Python вам нужно написать вспомогательную функцию, чтобы явно распаковать аргументы. Если вы хотите использовать with
, вам также потребуется написать оболочку, чтобы превратить Pool
в диспетчер контекстов. (Спасибо muon за это.)
import multiprocessing
from itertools import product
from contextlib import contextmanager
def merge_names(a, b):
return '{} & {}'.format(a, b)
def merge_names_unpack(args):
return merge_names(*args)
@contextmanager
def poolcontext(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()
if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with poolcontext(processes=3) as pool:
results = pool.map(merge_names_unpack, product(names, repeat=2))
print(results)
# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...
В более простых случаях с фиксированным вторым аргументом вы также можете использовать partial
, но только в Python 2.7 +.
import multiprocessing
from functools import partial
from contextlib import contextmanager
@contextmanager
def poolcontext(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()
def merge_names(a, b):
return '{} & {}'.format(a, b)
if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with poolcontext(processes=3) as pool:
results = pool.map(partial(merge_names, b='Sons'), names)
print(results)
# Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...
1. Большая часть этого была вдохновлена его ответом, который, вероятно, должен был быть принят вместо этого. Но так как этот застрял наверху, лучше всего улучшить его для будущих читателей.
Ответ 2
Есть ли вариант pool.map, который поддерживает несколько аргументов?
Python 3.3 включает pool.starmap()
метод:
#!/usr/bin/env python3
from functools import partial
from itertools import repeat
from multiprocessing import Pool, freeze_support
def func(a, b):
return a + b
def main():
a_args = [1,2,3]
second_arg = 1
with Pool() as pool:
L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
M = pool.starmap(func, zip(a_args, repeat(second_arg)))
N = pool.map(partial(func, b=second_arg), a_args)
assert L == M == N
if __name__=="__main__":
freeze_support()
main()
Для более старых версий:
#!/usr/bin/env python2
import itertools
from multiprocessing import Pool, freeze_support
def func(a, b):
print a, b
def func_star(a_b):
"""Convert `f([1,2])` to `f(1,2)` call."""
return func(*a_b)
def main():
pool = Pool()
a_args = [1,2,3]
second_arg = 1
pool.map(func_star, itertools.izip(a_args, itertools.repeat(second_arg)))
if __name__=="__main__":
freeze_support()
main()
Выход
1 1
2 1
3 1
Обратите внимание, что здесь itertools.izip()
и itertools.repeat()
.
Из-за ошибки, упомянутой @unutbu, вы не можете использовать functools.partial()
или аналогичные возможности на Python 2.6, поэтому простая описательная функция func_star()
должна быть явно определена. См. Также обходной путь предложенный uptimebox
.
Ответ 3
Я думаю, что ниже будет лучше
def multi_run_wrapper(args):
return add(*args)
def add(x,y):
return x+y
if __name__ == "__main__":
from multiprocessing import Pool
pool = Pool(4)
results = pool.map(multi_run_wrapper,[(1,2),(2,3),(3,4)])
print results
Выход
[3, 5, 7]
Ответ 4
Использование Python 3.3 + с pool.starmap():
from multiprocessing.dummy import Pool as ThreadPool
def write(i, x):
print(i, "---", x)
a = ["1","2","3"]
b = ["4","5","6"]
pool = ThreadPool(2)
pool.starmap(write, zip(a,b))
pool.close()
pool.join()
Результат:
1 --- 4
2 --- 5
3 --- 6
Вы также можете сделать zip() больше аргументов: zip(a,b,c,d,e)
Если вы хотите иметь постоянное значение, переданное как аргумент, вы должны использовать import itertools
, а затем zip(itertools.repeat(constant), a)
например.
Ответ 5
Узнав об itertools в J.F. Sebastian. Я решил сделать еще один шаг и написать пакет parmap
, который заботится о распараллеливании, предлагая функции map
и starmap
на python-2.7 и python-3.2 (и позже также), что может принимать любое количество позиционных аргументов.
Установка
pip install parmap
Как распараллеливать:
import parmap
# If you want to do:
y = [myfunction(x, argument1, argument2) for x in mylist]
# In parallel:
y = parmap.map(myfunction, mylist, argument1, argument2)
# If you want to do:
z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist]
# In parallel:
z = parmap.starmap(myfunction, mylist, argument1, argument2)
# If you want to do:
listx = [1, 2, 3, 4, 5, 6]
listy = [2, 3, 4, 5, 6, 7]
param = 3.14
param2 = 42
listz = []
for (x, y) in zip(listx, listy):
listz.append(myfunction(x, y, param1, param2))
# In parallel:
listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2)
Я загрузил parmap в PyPI и в репозиторий github.
В качестве примера на вопрос можно ответить следующим образом:
import parmap
def harvester(case, text):
X = case[0]
text+ str(X)
if __name__ == "__main__":
case = RAW_DATASET # assuming this is an iterable
parmap.map(harvester, case, "test", chunksize=1)
Ответ 6
Там вилка multiprocessing
называется pathos (обратите внимание: используйте версию на github), которая не нуждается в starmap
- функции карты зеркало API для карты python, поэтому карта может принимать несколько аргументов. С помощью pathos
вы также можете выполнять многопроцессорную обработку в интерпретаторе вместо того, чтобы застревать в блоке __main__
. Пафос должен быть выпущен после некоторого мягкого обновления - в основном преобразования в python 3.x.
Python 2.7.5 (default, Sep 30 2013, 20:15:49)
[GCC 4.2.1 (Apple Inc. build 5566)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> def func(a,b):
... print a,b
...
>>>
>>> from pathos.multiprocessing import ProcessingPool
>>> pool = ProcessingPool(nodes=4)
>>> pool.map(func, [1,2,3], [1,1,1])
1 1
2 1
3 1
[None, None, None]
>>>
>>> # also can pickle stuff like lambdas
>>> result = pool.map(lambda x: x**2, range(10))
>>> result
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>>
>>> # also does asynchronous map
>>> result = pool.amap(pow, [1,2,3], [4,5,6])
>>> result.get()
[1, 32, 729]
>>>
>>> # or can return a map iterator
>>> result = pool.imap(pow, [1,2,3], [4,5,6])
>>> result
<processing.pool.IMapIterator object at 0x110c2ffd0>
>>> list(result)
[1, 32, 729]
Ответ 7
Вы можете использовать следующие две функции, чтобы избежать написания обертки для каждой новой функции:
import itertools
from multiprocessing import Pool
def universal_worker(input_pair):
function, args = input_pair
return function(*args)
def pool_args(function, *args):
return zip(itertools.repeat(function), zip(*args))
Используйте функцию function
со списком аргументов arg_0
, arg_1
и arg_2
следующим образом:
pool = Pool(n_core)
list_model = pool.map(universal_worker, pool_args(function, arg_0, arg_1, arg_2)
pool.close()
pool.join()
Ответ 8
Лучше использовать декоратор вместо того, чтобы вручную писать функцию обертки. Особенно, когда у вас есть много функций для отображения, декоратор сэкономит ваше время, избегая писать обертку для каждой функции. Обычно украшенная функция не подбирается, однако мы можем использовать functools
, чтобы обойти ее. Более дискурсии можно найти здесь.
Здесь пример
def unpack_args(func):
from functools import wraps
@wraps(func)
def wrapper(args):
if isinstance(args, dict):
return func(**args)
else:
return func(*args)
return wrapper
@unpack_args
def func(x, y):
return x + y
Затем вы можете сопоставить его с заархивированными аргументами
np, xlist, ylist = 2, range(10), range(10)
pool = Pool(np)
res = pool.map(func, zip(xlist, ylist))
pool.close()
pool.join()
Конечно, вы всегда можете использовать Pool.starmap
в Python 3 ( >= 3.3), как упоминалось в других ответах.
Ответ 9
Еще одна простая альтернатива - обернуть параметры функции в кортеж, а затем обернуть параметры, которые также должны быть переданы в кортежах. Это, возможно, не идеально подходит для обработки больших фрагментов данных. Я считаю, что он будет делать копии для каждого кортежа.
from multiprocessing import Pool
def f((a,b,c,d)):
print a,b,c,d
return a + b + c +d
if __name__ == '__main__':
p = Pool(10)
data = [(i+0,i+1,i+2,i+3) for i in xrange(10)]
print(p.map(f, data))
p.close()
p.join()
Дает вывод в некотором случайном порядке:
0 1 2 3
1 2 3 4
2 3 4 5
3 4 5 6
4 5 6 7
5 6 7 8
7 8 9 10
6 7 8 9
8 9 10 11
9 10 11 12
[6, 10, 14, 18, 22, 26, 30, 34, 38, 42]
Ответ 10
Лучшее решение для python2:
from multiprocessing import Pool
def func((i, (a, b))):
print i, a, b
return a + b
pool = Pool(3)
pool.map(func, [(0,(1,2)), (1,(2,3)), (2,(3, 4))])
2 3 4
1 2 3
0 1 2
из []:
[3, 5, 7]
Ответ 11
Другой способ - передать список списков в однопараметрическую процедуру:
import os
from multiprocessing import Pool
def task(args):
print "PID =", os.getpid(), ", arg1 =", args[0], ", arg2 =", args[1]
pool = Pool()
pool.map(task, [
[1,2],
[3,4],
[5,6],
[7,8]
])
Можно построить список списков аргументов одним любимым способом.
Ответ 12
Из python 3.4.4 вы можете использовать multiprocessing.get_context() для получения объекта контекста для использования нескольких методов запуска:
import multiprocessing as mp
def foo(q, h, w):
q.put(h + ' ' + w)
print(h + ' ' + w)
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,'hello', 'world'))
p.start()
print(q.get())
p.join()
Или просто просто замените
pool.map(harvester(text,case),case, 1)
по:
pool.apply_async(harvester(text,case),case, 1)
Ответ 13
# "Как принимать несколько аргументов".
def f1(args):
a, b, c = args[0] , args[1] , args[2]
return a+b+c
if __name__ == "__main__":
import multiprocessing
pool = multiprocessing.Pool(4)
result1 = pool.map(f1, [ [1,2,3] ])
print(result1)
Ответ 14
В официальной документации указано, что он поддерживает только один итеративный аргумент. Мне нравится использовать apply_async в таких случаях. В вашем случае я бы сделал:
from multiprocessing import Process, Pool, Manager
text = "test"
def harvester(text, case, q = None):
X = case[0]
res = text+ str(X)
if q:
q.put(res)
return res
def block_until(q, results_queue, until_counter=0):
i = 0
while i < until_counter:
results_queue.put(q.get())
i+=1
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=6)
case = RAW_DATASET
m = Manager()
q = m.Queue()
results_queue = m.Queue() # when it completes results will reside in this queue
blocking_process = Process(block_until, (q, results_queue, len(case)))
blocking_process.start()
for c in case:
try:
res = pool.apply_async(harvester, (text, case, q = None))
res.get(timeout=0.1)
except:
pass
blocking_process.join()
Ответ 15
text = "test"
def unpack(args):
return args[0](*args[1:])
def harvester(text, case):
X = case[0]
text+ str(X)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=6)
case = RAW_DATASET
# args is a list of tuples
# with the function to execute as the first item in each tuple
args = [(harvester, text, c) for c in case]
# doing it this way, we can pass any function
# and we don't need to define a wrapper for each different function
# if we need to use more than one
pool.map(unpack, args)
pool.close()
pool.join()
Ответ 16
Это пример подпрограммы, которую я использую для передачи нескольких аргументов в функцию с одним аргументом, используемую в форке pool.imap:
from multiprocessing import Pool
# Wrapper of the function to map:
class makefun:
def __init__(self, var2):
self.var2 = var2
def fun(self, i):
var2 = self.var2
return var1[i] + var2
# Couple of variables for the example:
var1 = [1, 2, 3, 5, 6, 7, 8]
var2 = [9, 10, 11, 12]
# Open the pool:
pool = Pool(processes=2)
# Wrapper loop
for j in range(len(var2)):
# Obtain the function to map
pool_fun = makefun(var2[j]).fun
# Fork loop
for i, value in enumerate(pool.imap(pool_fun, range(len(var1))), 0):
print(var1[i], '+' ,var2[j], '=', value)
# Close the pool
pool.close()
Ответ 17
для python2 вы можете использовать этот трюк
def fun(a,b):
return a+b
pool = multiprocessing.Pool(processes=6)
b=233
pool.map(lambda x:fun(x,b),range(1000))