Многопроцессорные функции вызова pool.map в определенном порядке

Как я могу сделать multiprocessing.pool.map распределять процессы в числовом порядке?


Дополнительная информация:
У меня есть программа, которая обрабатывает несколько тысяч файлов данных, составляя график каждого из них. Я использую multiprocessing.pool.map для распространения каждого файла на процессор, и он отлично работает. Иногда это занимает много времени, и было бы неплохо смотреть на выходные изображения по мере запуска программы. Это было бы намного проще, если процесс карты распределял моментальные снимки по порядку; вместо этого для конкретного запуска, который я только что выполнил, были проанализированы первые 8 снимков: 0, 78, 156, 234, 312, 390, 468, 546. Есть ли способ заставить их более тесно распространять их в численном порядке?


Пример:
Здесь пример кода, который содержит те же ключевые элементы, и показывает тот же основной результат:

import sys
from multiprocessing import Pool
import time

num_proc  = 4; num_calls = 20; sleeper   = 0.1

def SomeFunc(arg):
    time.sleep(sleeper)
    print "%5d" % (arg),
    sys.stdout.flush()     # otherwise doesn't print properly on single line

proc_pool = Pool(num_proc)
proc_pool.map( SomeFunc, range(num_calls) )

Урожайность:

   0  4  2  6   1   5   3   7   8  10  12  14  13  11   9  15  16  18  17  19

Ответ:

Из @Hayden: используйте параметр "chunksize", def map(self, func, iterable, chunksize=None).

Дополнительная информация:
chunksize определяет, сколько итераций выделяется каждому процессору за раз. Мой пример выше, например, использует chunksize of 2 ---, что означает, что каждый процессор отключается и делает свою вещь для 2 итераций функции, а затем возвращается для большего количества ( "регистрация" ). Компромисс за chunksize заключается в том, что для регистрации требуется накладные расходы, когда процессор должен синхронизироваться с другими, предлагая вам большой chunksize. С другой стороны, если у вас большие куски, то один процессор может завершить свой кусок, в то время как другой - долгое время остается, поэтому вы должны использовать small chunksize. Я полагаю, что дополнительная полезная информация заключается в том, какой диапазон существует, сколько времени может выполнять каждый вызов функции. Если они действительно все должны использовать одинаковое количество времени, более эффективно использовать большой размер куска. С другой стороны, если некоторые вызовы функций могут занимать вдвое больше, чем другие, вам нужен небольшой фрагмент, чтобы процессоры не ждали ожидания.

Для моей проблемы каждый вызов функции должен быть очень близок к тому же количеству времени (я думаю), поэтому, если я хочу, чтобы процессы вызывались по порядку, я собираюсь пожертвовать эффективностью из-за регистрации накладные расходы.

Ответы

Ответ 1

Причина, по которой это происходит, состоит в том, что каждый процесс получает предопределенное количество работы, которое необходимо выполнить в начале вызова для сопоставления, которое зависит от chunksize. Мы можем разработать стандартный chunksize, посмотрев на источник pool.map

chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
if extra:
  chunksize += 1

Итак, для диапазона 20 и с 4 процессами мы получим chunksize из 2.

Если мы изменим ваш код, чтобы отразить это, мы должны получить аналогичные результаты с результатами, которые вы получаете сейчас:

proc_pool.map(SomeFunc, range(num_calls), chunksize=2)

Это дает результат:

0 2 6 4 1 7 5 3 8 10 12 14 9 13 15 11 16 18 17 19

Теперь установка chunksize=1 гарантирует, что каждому процессу в пуле будет задана только одна задача за раз.

proc_pool.map(SomeFunc, range(num_calls), chunksize=1)

Это должно обеспечить разумно хорошее числовое упорядочение по сравнению с тем, когда не указывать chunksize. Например, chunksize of 1 дает результат:

0 1 2 3 4 5 6 7 9 10 8 11 13 12 15 14 16 17 19 18

Ответ 2

Как изменить map на imap:

import os
from multiprocessing import Pool
import time

num_proc = 4
num_calls = 20
sleeper = 0.1

def SomeFunc(arg):
    time.sleep(sleeper)
    print "%s %5d" % (os.getpid(), arg)
    return arg

proc_pool = Pool(num_proc)
list(proc_pool.imap(SomeFunc, range(num_calls)))

Возможно, причина в том, что значение по умолчанию chunksize для imap равно 1, поэтому оно может не работать до map.