Многопроцессорность и память Python
Я использую multiprocessing.imap_unordered
для выполнения вычисления в списке значений:
def process_parallel(fnc, some_list):
pool = multiprocessing.Pool()
for result in pool.imap_unordered(fnc, some_list):
for x in result:
yield x
pool.terminate()
Каждый вызов fnc
возвращает объект HUGE в результате, по дизайну. Я могу хранить N экземпляров такого объекта в ОЗУ, где N ~ cpu_count, но не намного больше (не сотни).
Теперь использование этой функции занимает слишком много памяти. Память полностью расходуется в основном процессе, а не на рабочих.
Как imap_unordered
сохранить готовые результаты? Я имею в виду результаты, которые уже были возвращены работниками, но еще не переданы пользователю. Я думал, что он умный и только вычислил их "лениво" по мере необходимости, но, видимо, нет.
Похоже, что, поскольку я не могу достаточно быстро использовать результаты process_parallel
, пул продолжает массово размещать эти огромные объекты из fnc
где-то, внутри, а затем взрывается. Есть ли способ избежать этого? Ограничить внутреннюю очередь?
Я использую Python2.7. Приветствия.
Ответы
Ответ 1
Как вы можете видеть, просмотрев соответствующий исходный файл (python2.7/multiprocessing/pool.py
), IMapUnorderedIterator использует экземпляр collections.deque
для хранения результатов. Если появился новый элемент, он добавляется и удаляется на итерации.
Как вы предположили, если другой огромный объект приходит, когда основной поток все еще обрабатывает объект, они также будут сохранены в памяти.
Что вы можете попробовать, это примерно так:
it = pool.imap_unordered(fnc, some_list)
for result in it:
it._cond.acquire()
for x in result:
yield x
it._cond.release()
Это должно привести к тому, что поток результатов задачи-результата-получателя будет заблокирован во время обработки элемента, если он пытается поместить следующий объект в deque.
Таким образом, не должно быть более двух огромных объектов в памяти.
Если это работает для вашего дела, я не знаю;)
Ответ 2
Самое простое решение, которое я могу придумать, - добавить замыкание, чтобы обернуть вашу функцию fnc
, которая будет использовать семафор для управления общим количеством одновременных заданий, которые могут выполняться за один раз (я предполагаю, что основной процесс/поток будет увеличивать семафор). Значение семафора может быть рассчитано на основе размера задания и доступной памяти.