Python Multiprocessing.Pool ленивая итерация
Мне интересно, как класс python Multiprocessing.Pool работает с картой, imap и map_async. Моя проблема заключается в том, что я хочу отобразить на итераторе, который создает объекты с большой памятью, и не хочет, чтобы все эти объекты генерировались в память одновременно. Я хотел посмотреть, будут ли различные функции map() вызывать мой итератор сухим или интеллектуально вызвать функцию next() только тогда, когда дочерние процессы медленно продвигаются, поэтому я взломал некоторые тесты как таковые:
def g():
for el in xrange(100):
print el
yield el
def f(x):
time.sleep(1)
return x*x
if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
go = g()
g2 = pool.imap(f, go)
g2.next()
И так далее с map, imap и map_async. Это самый вопиющий пример, так как просто вызов next() за один раз на g2 выводит все мои элементы из моего генератора g(), тогда как ifap делает это "лениво", я ожидал бы, что он будет только называть go.next() один раз и, следовательно, распечатать только "1".
Может кто-то прояснить, что происходит, и если есть способ, чтобы пул процессов "лениво" оценивал итератор по мере необходимости?
Спасибо,
Гейб
Ответы
Ответ 1
Посмотрите сначала на конец программы.
В многопроцессорном модуле используется atexit
для вызова multiprocessing.util._exit_function
, когда заканчивается ваша программа.
Если вы удалите g2.next()
, ваша программа быстро закончится.
_exit_function
в конечном итоге вызывает Pool._terminate_pool
. Основной поток изменяет состояние pool._task_handler._state
от RUN
до TERMINATE
. Тем временем поток pool._task_handler
зацикливается на Pool._handle_tasks
и сворачивается, когда он достигает условия
if thread._state:
debug('task handler found thread._state != RUN')
break
(См./usr/lib/python2.6/multiprocessing/pool.py)
Это то, что останавливает обработчик задачи от полного потребления вашего генератора, g()
. Если вы посмотрите Pool._handle_tasks
, вы увидите
for i, task in enumerate(taskseq):
...
try:
put(task)
except IOError:
debug('could not put task on queue')
break
Это код, который потребляет ваш генератор. (taskseq
не совсем ваш генератор, но как taskseq
потребляется, так и ваш генератор.)
В отличие от этого, когда вы вызываете g2.next()
, основной поток вызывает IMapIterator.next
и ждет, когда он достигнет self._cond.wait(timeout)
.
Что главный поток ждет вместо
вызов _exit_function
- это то, что позволяет потоку обработчика задачи нормально работать, что означает, что он полностью потребляет генератор в качестве put
задач в worker
s 'inqueue
в функции Pool._handle_tasks
.
Суть в том, что все функции отображения Pool
потребляют всю итерабельность, которую он задает. Если вы хотите использовать генератор в кусках, вы можете сделать это вместо:
import multiprocessing as mp
import itertools
import time
def g():
for el in xrange(50):
print el
yield el
def f(x):
time.sleep(1)
return x * x
if __name__ == '__main__':
pool = mp.Pool(processes=4) # start 4 worker processes
go = g()
result = []
N = 11
while True:
g2 = pool.map(f, itertools.islice(go, N))
if g2:
result.extend(g2)
time.sleep(1)
else:
break
print(result)
Ответ 2
У меня тоже была эта проблема, и я был разочарован, узнав, что карта потребляет все ее элементы. Я закодировал функцию, которая лениво использует итератор, используя тип данных очереди в многопроцессорной обработке. Это похоже на то, что @unutbu описывает в комментарии к его ответу, но, как он указывает, страдает отсутствием механизма обратного вызова для повторной загрузки очереди. Вместо этого тип данных Queue предоставляет параметр таймаута, и я использовал 100 миллисекунд для хорошего эффекта.
from multiprocessing import Process, Queue, cpu_count
from Queue import Full as QueueFull
from Queue import Empty as QueueEmpty
def worker(recvq, sendq):
for func, args in iter(recvq.get, None):
result = func(*args)
sendq.put(result)
def pool_imap_unordered(function, iterable, procs=cpu_count()):
# Create queues for sending/receiving items from iterable.
sendq = Queue(procs)
recvq = Queue()
# Start worker processes.
for rpt in xrange(procs):
Process(target=worker, args=(sendq, recvq)).start()
# Iterate iterable and communicate with worker processes.
send_len = 0
recv_len = 0
itr = iter(iterable)
try:
value = itr.next()
while True:
try:
sendq.put((function, value), True, 0.1)
send_len += 1
value = itr.next()
except QueueFull:
while True:
try:
result = recvq.get(False)
recv_len += 1
yield result
except QueueEmpty:
break
except StopIteration:
pass
# Collect all remaining results.
while recv_len < send_len:
result = recvq.get()
recv_len += 1
yield result
# Terminate worker processes.
for rpt in xrange(procs):
sendq.put(None)
Это решение имеет то преимущество, что не требует пакетных запросов к Pool.map. Один отдельный работник не может заблокировать других от прогресса. YMMV. Обратите внимание, что вы можете использовать другой объект для прекращения сигнала для рабочих. В этом примере я использовал None.
Протестировано на "Python 2.7 (r27: 82525, 4 июля 2010, 09:01:59) [MSC v.1500 32 бит (Intel)] на win32"
Ответ 3
Что вы хотите реализовать в пакете NuMap с веб-сайта:
NuMap является параллельным (основанным на потоке или процессом, локальным или удаленным) буферизованный, многозадачный, itertools.imap или многопроцессорный. Pool.imap замена функций. Как и imap, он вычисляет функцию на элементах последовательность или итерируемость, и она делает это лениво. Лень можно настроить с помощью аргументов "шага" и "буфера".