Объединение itertools и многопроцессорности?
У меня есть массив 256x256x256
Numpy, в котором каждый элемент является матрицей. Мне нужно сделать некоторые вычисления на каждой из этих матриц, и я хочу использовать модуль multiprocessing
, чтобы ускорить процесс.
Результаты этих вычислений должны храниться в массиве 256x256x256
, таком как исходный, так что результат матрицы в элементе [i,j,k]
в исходном массиве должен быть помещен в элемент [i,j,k]
нового массив.
Чтобы сделать это, я хочу составить список, который может быть написан псевдо-иш образом как [array[i,j,k], (i, j, k)]
и передать его функции, которая будет "многопроцессорной".
Предполагая, что matrices
- список всех матриц, извлеченных из исходного массива, и myfunc
- это функция, выполняющая вычисления, код выглядит примерно так:
import multiprocessing
import numpy as np
from itertools import izip
def myfunc(finput):
# Do some calculations...
...
# ... and return the result and the index:
return (result, finput[1])
# Make indices:
inds = np.rollaxis(np.indices((256, 256, 256)), 0, 4).reshape(-1, 3)
# Make function input from the matrices and the indices:
finput = izip(matrices, inds)
pool = multiprocessing.Pool()
async_results = np.asarray(pool.map_async(myfunc, finput).get(999999))
Однако, похоже, что map_async
на самом деле создает этот огромный finput
-list: мой CPU мало что делает, но память и своп полностью поглощаются за считанные секунды, что явно не так Я хочу.
Есть ли способ передать этот огромный список функции многопроцессорности без необходимости явно создавать ее?
Или вы знаете другой способ решения этой проблемы?
Спасибо!: -)
Ответы
Ответ 1
Все методы multiprocessing.Pool.map*
потребляют итераторы полностью (демонстрационный код), как только вызывается функция. Чтобы подавать функции функции карты итератора на один кусок за раз, используйте grouper_nofill
:
def grouper_nofill(n, iterable):
'''list(grouper_nofill(3, 'ABCDEFG')) --> [['A', 'B', 'C'], ['D', 'E', 'F'], ['G']]
'''
it=iter(iterable)
def take():
while 1: yield list(itertools.islice(it,n))
return iter(take().next,[])
chunksize=256
async_results=[]
for finput in grouper_nofill(chunksize,itertools.izip(matrices, inds)):
async_results.extend(pool.map_async(myfunc, finput).get())
async_results=np.array(async_results)
PS. pool.map_async
chunksize
параметр делает что-то другое: он разбивает итерабельность на куски, а затем дает каждому фрагменту рабочий процесс, который вызывает map(func,chunk)
. Это может привести к тому, что рабочий процесс получит больше данных, если func(item)
закончится слишком быстро, но это не поможет в вашей ситуации, так как итератор все еще полностью поглощается сразу после вызова map_async
.
Ответ 2
Я столкнулся с этой проблемой. вместо этого:
res = p.map(func, combinations(arr, select_n))
do
res = p.imap(func, combinations(arr, select_n))
imap не потребляет его!
Ответ 3
Pool.map_async()
необходимо знать длину итерации для отправки работы нескольким работникам. Поскольку izip
не имеет __len__
, он сначала преобразует итерабельность в список, вызывая огромное использование памяти, которое вы испытываете.
Вы можете попытаться обойти это, создав свой собственный тестер izip
с __len__
.