Многопроцессорный пул Python pool.map для нескольких аргументов

В библиотеке многопроцессорности Python существует ли вариант pool.map, который поддерживает несколько аргументов?

text = "test"
def harvester(text, case):
    X = case[0]
    text+ str(X)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    pool.map(harvester(text,case),case, 1)
    pool.close()
    pool.join()

Ответы

Ответ 1

Ответ на этот вопрос зависит от версии и ситуации. Самый общий ответ для последних версий Python (начиная с 3.3) был впервые описан ниже J.F. Себастьян. 1 Он использует метод Pool.starmap, который принимает последовательность кортежей аргументов. Затем он автоматически распаковывает аргументы из каждого кортежа и передает их в заданную функцию:

import multiprocessing
from itertools import product

def merge_names(a, b):
    return '{} & {}'.format(a, b)

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with multiprocessing.Pool(processes=3) as pool:
        results = pool.starmap(merge_names, product(names, repeat=2))
    print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

Для более ранних версий Python вам нужно написать вспомогательную функцию, чтобы явно распаковать аргументы. Если вы хотите использовать with, вам также потребуется написать оболочку, чтобы превратить Pool в диспетчер контекстов. (Спасибо muon за это.)

import multiprocessing
from itertools import product
from contextlib import contextmanager

def merge_names(a, b):
    return '{} & {}'.format(a, b)

def merge_names_unpack(args):
    return merge_names(*args)

@contextmanager
def poolcontext(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    yield pool
    pool.terminate()

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with poolcontext(processes=3) as pool:
        results = pool.map(merge_names_unpack, product(names, repeat=2))
    print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

В более простых случаях с фиксированным вторым аргументом вы также можете использовать partial, но только в Python 2.7 +.

import multiprocessing
from functools import partial
from contextlib import contextmanager

@contextmanager
def poolcontext(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    yield pool
    pool.terminate()

def merge_names(a, b):
    return '{} & {}'.format(a, b)

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with poolcontext(processes=3) as pool:
        results = pool.map(partial(merge_names, b='Sons'), names)
    print(results)

# Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...

1. Большая часть этого была вдохновлена ​​его ответом, который, вероятно, должен был быть принят вместо этого. Но так как этот застрял наверху, лучше всего улучшить его для будущих читателей.

Ответ 2

Есть ли вариант pool.map, который поддерживает несколько аргументов?

Python 3.3 включает pool.starmap() метод:

#!/usr/bin/env python3
from functools import partial
from itertools import repeat
from multiprocessing import Pool, freeze_support

def func(a, b):
    return a + b

def main():
    a_args = [1,2,3]
    second_arg = 1
    with Pool() as pool:
        L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
        M = pool.starmap(func, zip(a_args, repeat(second_arg)))
        N = pool.map(partial(func, b=second_arg), a_args)
        assert L == M == N

if __name__=="__main__":
    freeze_support()
    main()

Для более старых версий:

#!/usr/bin/env python2
import itertools
from multiprocessing import Pool, freeze_support

def func(a, b):
    print a, b

def func_star(a_b):
    """Convert `f([1,2])` to `f(1,2)` call."""
    return func(*a_b)

def main():
    pool = Pool()
    a_args = [1,2,3]
    second_arg = 1
    pool.map(func_star, itertools.izip(a_args, itertools.repeat(second_arg)))

if __name__=="__main__":
    freeze_support()
    main()

Выход

1 1
2 1
3 1

Обратите внимание, что здесь itertools.izip() и itertools.repeat().

Из-за ошибки, упомянутой @unutbu, вы не можете использовать functools.partial() или аналогичные возможности на Python 2.6, поэтому простая описательная функция func_star() должна быть явно определена. См. Также обходной путь предложенный uptimebox.

Ответ 3

Я думаю, что ниже будет лучше

def multi_run_wrapper(args):
   return add(*args)
def add(x,y):
    return x+y
if __name__ == "__main__":
    from multiprocessing import Pool
    pool = Pool(4)
    results = pool.map(multi_run_wrapper,[(1,2),(2,3),(3,4)])
    print results

Выход

[3, 5, 7]

Ответ 4

Использование Python 3.3 + с pool.starmap():

from multiprocessing.dummy import Pool as ThreadPool 

def write(i, x):
    print(i, "---", x)

a = ["1","2","3"]
b = ["4","5","6"] 

pool = ThreadPool(2)
pool.starmap(write, zip(a,b)) 
pool.close() 
pool.join()

Результат:

1 --- 4
2 --- 5
3 --- 6

Вы также можете сделать zip() больше аргументов: zip(a,b,c,d,e)

Если вы хотите иметь постоянное значение, переданное как аргумент, вы должны использовать import itertools, а затем zip(itertools.repeat(constant), a) например.

Ответ 5

Узнав об itertools в J.F. Sebastian. Я решил сделать еще один шаг и написать пакет parmap, который заботится о распараллеливании, предлагая функции map и starmap на python-2.7 и python-3.2 (и позже также), что может принимать любое количество позиционных аргументов.

Установка

pip install parmap

Как распараллеливать:

import parmap
# If you want to do:
y = [myfunction(x, argument1, argument2) for x in mylist]
# In parallel:
y = parmap.map(myfunction, mylist, argument1, argument2)

# If you want to do:
z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist]
# In parallel:
z = parmap.starmap(myfunction, mylist, argument1, argument2)

# If you want to do:
listx = [1, 2, 3, 4, 5, 6]
listy = [2, 3, 4, 5, 6, 7]
param = 3.14
param2 = 42
listz = []
for (x, y) in zip(listx, listy):
        listz.append(myfunction(x, y, param1, param2))
# In parallel:
listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2)

Я загрузил parmap в PyPI и в репозиторий github.

В качестве примера на вопрос можно ответить следующим образом:

import parmap

def harvester(case, text):
    X = case[0]
    text+ str(X)

if __name__ == "__main__":
    case = RAW_DATASET  # assuming this is an iterable
    parmap.map(harvester, case, "test", chunksize=1)

Ответ 6

Там вилка multiprocessing называется pathos (обратите внимание: используйте версию на github), которая не нуждается в starmap - функции карты зеркало API для карты python, поэтому карта может принимать несколько аргументов. С помощью pathos вы также можете выполнять многопроцессорную обработку в интерпретаторе вместо того, чтобы застревать в блоке __main__. Пафос должен быть выпущен после некоторого мягкого обновления - в основном преобразования в python 3.x.

  Python 2.7.5 (default, Sep 30 2013, 20:15:49) 
  [GCC 4.2.1 (Apple Inc. build 5566)] on darwin
  Type "help", "copyright", "credits" or "license" for more information.
  >>> def func(a,b):
  ...     print a,b
  ...
  >>>
  >>> from pathos.multiprocessing import ProcessingPool    
  >>> pool = ProcessingPool(nodes=4)
  >>> pool.map(func, [1,2,3], [1,1,1])
  1 1
  2 1
  3 1
  [None, None, None]
  >>>
  >>> # also can pickle stuff like lambdas 
  >>> result = pool.map(lambda x: x**2, range(10))
  >>> result
  [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
  >>>
  >>> # also does asynchronous map
  >>> result = pool.amap(pow, [1,2,3], [4,5,6])
  >>> result.get()
  [1, 32, 729]
  >>>
  >>> # or can return a map iterator
  >>> result = pool.imap(pow, [1,2,3], [4,5,6])
  >>> result
  <processing.pool.IMapIterator object at 0x110c2ffd0>
  >>> list(result)
  [1, 32, 729]

Ответ 7

Вы можете использовать следующие две функции, чтобы избежать написания обертки для каждой новой функции:

import itertools
from multiprocessing import Pool

def universal_worker(input_pair):
    function, args = input_pair
    return function(*args)

def pool_args(function, *args):
    return zip(itertools.repeat(function), zip(*args))

Используйте функцию function со списком аргументов arg_0, arg_1 и arg_2 следующим образом:

pool = Pool(n_core)
list_model = pool.map(universal_worker, pool_args(function, arg_0, arg_1, arg_2)
pool.close()
pool.join()

Ответ 8

Лучше использовать декоратор вместо того, чтобы вручную писать функцию обертки. Особенно, когда у вас есть много функций для отображения, декоратор сэкономит ваше время, избегая писать обертку для каждой функции. Обычно украшенная функция не подбирается, однако мы можем использовать functools, чтобы обойти ее. Более дискурсии можно найти здесь.

Здесь пример

def unpack_args(func):
    from functools import wraps
    @wraps(func)
    def wrapper(args):
        if isinstance(args, dict):
            return func(**args)
        else:
            return func(*args)
    return wrapper

@unpack_args
def func(x, y):
    return x + y

Затем вы можете сопоставить его с заархивированными аргументами

np, xlist, ylist = 2, range(10), range(10)
pool = Pool(np)
res = pool.map(func, zip(xlist, ylist))
pool.close()
pool.join()

Конечно, вы всегда можете использовать Pool.starmap в Python 3 ( >= 3.3), как упоминалось в других ответах.

Ответ 9

Еще одна простая альтернатива - обернуть параметры функции в кортеж, а затем обернуть параметры, которые также должны быть переданы в кортежах. Это, возможно, не идеально подходит для обработки больших фрагментов данных. Я считаю, что он будет делать копии для каждого кортежа.

from multiprocessing import Pool

def f((a,b,c,d)):
    print a,b,c,d
    return a + b + c +d

if __name__ == '__main__':
    p = Pool(10)
    data = [(i+0,i+1,i+2,i+3) for i in xrange(10)]
    print(p.map(f, data))
    p.close()
    p.join()

Дает вывод в некотором случайном порядке:

0 1 2 3
1 2 3 4
2 3 4 5
3 4 5 6
4 5 6 7
5 6 7 8
7 8 9 10
6 7 8 9
8 9 10 11
9 10 11 12
[6, 10, 14, 18, 22, 26, 30, 34, 38, 42]

Ответ 10

Лучшее решение для python2:

from multiprocessing import Pool
def func((i, (a, b))):
    print i, a, b
    return a + b
pool = Pool(3)
pool.map(func, [(0,(1,2)), (1,(2,3)), (2,(3, 4))])

2 3 4

1 2 3

0 1 2

из []:

[3, 5, 7]

Ответ 11

Другой способ - передать список списков в однопараметрическую процедуру:

import os
from multiprocessing import Pool

def task(args):
    print "PID =", os.getpid(), ", arg1 =", args[0], ", arg2 =", args[1]

pool = Pool()

pool.map(task, [
        [1,2],
        [3,4],
        [5,6],
        [7,8]
    ])

Можно построить список списков аргументов одним любимым способом.

Ответ 12

Из python 3.4.4 вы можете использовать multiprocessing.get_context() для получения объекта контекста для использования нескольких методов запуска:

import multiprocessing as mp

def foo(q, h, w):
    q.put(h + ' ' + w)
    print(h + ' ' + w)

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,'hello', 'world'))
    p.start()
    print(q.get())
    p.join()

Или просто просто замените

pool.map(harvester(text,case),case, 1)

по:

pool.apply_async(harvester(text,case),case, 1)

Ответ 13

# "Как принимать несколько аргументов".

def f1(args):
    a, b, c = args[0] , args[1] , args[2]
    return a+b+c

if __name__ == "__main__":
    import multiprocessing
    pool = multiprocessing.Pool(4) 

    result1 = pool.map(f1, [ [1,2,3] ])
    print(result1)

Ответ 14

В официальной документации указано, что он поддерживает только один итеративный аргумент. Мне нравится использовать apply_async в таких случаях. В вашем случае я бы сделал:

from multiprocessing import Process, Pool, Manager

text = "test"
def harvester(text, case, q = None):
 X = case[0]
 res = text+ str(X)
 if q:
  q.put(res)
 return res


def block_until(q, results_queue, until_counter=0):
 i = 0
 while i < until_counter:
  results_queue.put(q.get())
  i+=1

if __name__ == '__main__':
 pool = multiprocessing.Pool(processes=6)
 case = RAW_DATASET
 m = Manager()
 q = m.Queue()
 results_queue = m.Queue() # when it completes results will reside in this queue
 blocking_process = Process(block_until, (q, results_queue, len(case)))
 blocking_process.start()
 for c in case:
  try:
   res = pool.apply_async(harvester, (text, case, q = None))
   res.get(timeout=0.1)
  except:
   pass
 blocking_process.join()

Ответ 15

text = "test"

def unpack(args):
    return args[0](*args[1:])

def harvester(text, case):
    X = case[0]
    text+ str(X)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    # args is a list of tuples 
    # with the function to execute as the first item in each tuple
    args = [(harvester, text, c) for c in case]
    # doing it this way, we can pass any function
    # and we don't need to define a wrapper for each different function
    # if we need to use more than one
    pool.map(unpack, args)
    pool.close()
    pool.join()

Ответ 16

Это пример подпрограммы, которую я использую для передачи нескольких аргументов в функцию с одним аргументом, используемую в форке pool.imap:

from multiprocessing import Pool

# Wrapper of the function to map:
class makefun:
    def __init__(self, var2):
        self.var2 = var2
    def fun(self, i):
        var2 = self.var2
        return var1[i] + var2

# Couple of variables for the example:
var1 = [1, 2, 3, 5, 6, 7, 8]
var2 = [9, 10, 11, 12]

# Open the pool:
pool = Pool(processes=2)

# Wrapper loop
for j in range(len(var2)):
    # Obtain the function to map
    pool_fun = makefun(var2[j]).fun

    # Fork loop
    for i, value in enumerate(pool.imap(pool_fun, range(len(var1))), 0):
        print(var1[i], '+' ,var2[j], '=', value)

# Close the pool
pool.close()

Ответ 17

для python2 вы можете использовать этот трюк

def fun(a,b):
    return a+b

pool = multiprocessing.Pool(processes=6)
b=233
pool.map(lambda x:fun(x,b),range(1000))