Получают ли многопроцессорные пулы каждый процесс одинаковое количество задач или назначаются ли они доступными?
Когда вы map
итерабельны к multiprocessing.Pool
, являются ли итерации, разделенные на очередь для каждого процесса в пуле в начале, или есть общая очередь, из которой выполняется задача, когда процесс освобождается?
def generate_stuff():
for foo in range(100):
yield foo
def process(moo):
print moo
pool = multiprocessing.Pool()
pool.map(func=process, iterable=generate_stuff())
pool.close()
Итак, учитывая этот непроверенный код предложения; если в пуле есть 4 процесса, каждый процесс получает выделение 25 штук или же 100 штук извлекаются один за другим процессами, которые ищут вещи для того, чтобы каждый процесс мог выполнять различное количество материалов, например 30, 26, 24, 20.
Ответы
Ответ 1
Итак, учитывая этот непроверенный код предложения; если в пуле есть 4 процесса, каждый процесс получает выделение 25 штук или же 100 штук извлекаются один за другим процессами, которые ищут вещи для того, чтобы каждый процесс мог выполнять различное количество материалов, например 30, 26, 24, 20.
Ну, очевидный ответ - проверить его.
Как есть, тест может не сказать вам многого, потому что задания будут заканчиваться как можно скорее, и возможно, что все будет равномерно распределено, даже если объединенные процессы захватят задания по мере их готовности. Но есть простой способ исправить это:
import collections
import multiprocessing
import os
import random
import time
def generate_stuff():
for foo in range(100):
yield foo
def process(moo):
#print moo
time.sleep(random.randint(0, 50) / 10.)
return os.getpid()
pool = multiprocessing.Pool()
pids = pool.map(func=process, iterable=generate_stuff(), chunksize=1)
pool.close()
print collections.Counter(pids)
Если числа "зубчатые", вы знаете, что объединенные процессы должны захватывать новые рабочие места как готовые. (Я явно установил chunksize
в 1, чтобы убедиться, что куски не такие большие, что каждый из них получает только один кусок.)
Когда я запускаю его на 8-ядерную машину:
Counter({98935: 16, 98936: 16, 98939: 13, 98937: 12, 98942: 12, 98938: 11, 98940: 11, 98941: 9})
Итак, похоже, что процессы получают новые задания "на лету".
Поскольку вы специально спросили около 4 рабочих, я изменил Pool()
на Pool(4)
и получил следующее:
Counter({98965: 31, 98962: 24, 98964: 23, 98963: 22})
Однако есть еще лучший способ узнать, чем тестировать: прочитайте источник.
Как вы можете видеть, map
просто вызывает map_async
, который создает кучу пакетов и помещает их в объект self._taskqueue
(экземпляр Queue.Queue
). Если вы читаете дальше, эта очередь напрямую не передается другим процессам, но есть поток диспетчера пула, который, когда процесс заканчивается и возвращает результат, выдает следующее задание из очереди и отправляет его обратно в процесс.
Вот как вы можете узнать, что такое chunksize по умолчанию для map
. Приведенная выше реализация 2.7 показывает, что она просто len(iterable) / (len(self._pool) * 4)
округлена (немного более многословная, чем эта, чтобы избежать дробной арифметики), или, иначе говоря, достаточно просто для примерно 4 кусков на процесс. Но вы действительно не должны полагаться на это; документация смутно и косвенно подразумевает, что она собирается использовать какую-то эвристику, но не дает вам никаких гарантий относительно того, что это будет. Итак, если вам действительно нужно "около 4 кусков за процесс", выведите его явно. Более реалистично, если вам когда-нибудь понадобится что-то помимо стандартного, вам, вероятно, понадобится определенное для домена значение, которое вы собираетесь выполнить (путем вычисления, угадывания или профилирования).
Ответ 2
http://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.multiprocessing.Pool.map
map(func, iterable[, chunksize])
Этот метод прерывает итерацию в несколько кусков, которые он подает в пул процессов как отдельные задачи. (Приблизительный) размер этих кусков можно указать, установив chunksize на положительный целое число.
Я предполагаю, что процесс подбирает следующий фрагмент из очереди, когда выполняется предыдущий фрагмент.
Значение по умолчанию chunksize
зависит от длины iterable
и выбирается таким образом, чтобы количество блоков составляло в четыре раза больше количества процессов. (источник)
Ответ 3
Чтобы оценить chunksize
, используемый реализацией Python, не глядя на исходный код модуля multiprocessing
, запустите:
#!/usr/bin/env python
import multiprocessing as mp
from itertools import groupby
def work(index):
mp.get_logger().info(index)
return index, mp.current_process().name
if __name__ == "__main__":
import logging
import sys
logger = mp.log_to_stderr()
# process cmdline args
try:
sys.argv.remove('--verbose')
except ValueError:
pass # not verbose
else:
logger.setLevel(logging.INFO) # verbose
nprocesses, nitems = int(sys.argv.pop(1)), int(sys.argv.pop(1))
# choices: 'map', 'imap', 'imap_unordered'
map_name = sys.argv[1] if len(sys.argv) > 1 else 'map'
kwargs = dict(chunksize=int(sys.argv[2])) if len(sys.argv) > 2 else {}
# estimate chunksize used
max_chunksize = 0
map_func = getattr(mp.Pool(nprocesses), map_name)
for _, group in groupby(sorted(map_func(work, range(nitems), **kwargs),
key=lambda x: x[0]), # sort by index
key=lambda x: x[1]): # group by process name
max_chunksize = max(max_chunksize, len(list(group)))
print("%s: max_chunksize %d" % (map_name, max_chunksize))
Показывает, что imap
, imap_unordered
по умолчанию используется chunksize=1
, а max_chunksize
для map
зависит от nprocesses
, nitem
(количество блоков в процессе не фиксировано) и max_chunksize
зависит от версии python. Все функции *map*
учитывают параметр chunksize
, если он указан.
Использование
$ ./estimate_chunksize.py nprocesses nitems [map_name [chunksize]] [--verbose]
Чтобы узнать, как распределяются отдельные задания; укажите параметр --verbose
.