Многопроцессор python - процесс зависает при соединении для большой очереди
Я запускаю python 2.7.3, и я заметил следующее странное поведение. Рассмотрим этот минимальный пример:
from multiprocessing import Process, Queue
def foo(qin, qout):
while True:
bar = qin.get()
if bar is None:
break
qout.put({'bar': bar})
if __name__ == '__main__':
import sys
qin = Queue()
qout = Queue()
worker = Process(target=foo,args=(qin,qout))
worker.start()
for i in range(100000):
print i
sys.stdout.flush()
qin.put(i**2)
qin.put(None)
worker.join()
Когда я зацикливаю более 10 000 или более, мой script зависает на worker.join()
. Он отлично работает, когда цикл достигает 1000.
Любые идеи?
Ответы
Ответ 1
Очередь qout
в подпроцессе заполняется. Данные, которые вы ввели в нее из foo()
, не подходят в буфере внутренних линий OS, поэтому блоки подпроцессов пытаются подобрать больше данных. Но родительский процесс не читает эти данные: он просто заблокирован, ожидая завершения подпроцесса. Это типичный тупик.
Ответ 2
Там должно быть ограничение на размер очередей. Рассмотрим следующую модификацию:
from multiprocessing import Process, Queue
def foo(qin,qout):
while True:
bar = qin.get()
if bar is None:
break
#qout.put({'bar':bar})
if __name__=='__main__':
import sys
qin=Queue()
qout=Queue() ## POSITION 1
for i in range(100):
#qout=Queue() ## POSITION 2
worker=Process(target=foo,args=(qin,))
worker.start()
for j in range(1000):
x=i*100+j
print x
sys.stdout.flush()
qin.put(x**2)
qin.put(None)
worker.join()
print 'Done!'
Это работает как есть (с qout.put
строкой qout.put
). Если вы попытаетесь сохранить все 100000 результатов, то qout
станет слишком большим: если я раскомментирую qout.put({'bar':bar})
в foo
и оставлю определение qout
в ПОЛОЖЕНИИ 1, код зависнет. Если, однако, я qout
определение qout
в ПОЛОЖЕНИЕ 2, сценарий завершится.
Короче говоря, вы должны быть осторожны, чтобы ни qin
ни qout
становились слишком большими. (См. Также: Максимальный размер очереди многопроцессорной обработки - 32767)
Ответ 3
У меня была та же проблема на python3
, когда вы пытались поместить строки в очередь общего размера около 5000 cahrs.
В моем проекте был хост-процесс, который устанавливает очередь и запускает подпроцесс, затем присоединяется. Afrer join
хост-процесс читает форму очереди. Когда подпроцесс обрабатывает слишком много данных, хост висит на join
. Я исправил это, используя следующую функцию для ожидания подпроцесса в хост-процессе:
def yield_from_process(q, p):
while p.is_alive():
p.join(timeout=1)
while True:
try:
yield q.get(block=False)
except Empty:
break
Я читаю из очереди, как только он заполняется, поэтому он никогда не становится очень большим
Ответ 4
Я пытался .get()
асинхронный работник после закрытия пула
ошибка отступа вне блока с
у меня было это
with multiprocessing.Pool() as pool:
async_results = list()
for job in jobs:
async_results.append(
pool.apply_async(
_worker_func,
(job,),
)
)
# wrong
for async_result in async_results:
yield async_result.get()
мне это нужно
with multiprocessing.Pool() as pool:
async_results = list()
for job in jobs:
async_results.append(
pool.apply_async(
_worker_func,
(job,),
)
)
# right
for async_result in async_results:
yield async_result.get()