Многопроцессорная очередь в Python
Я пытаюсь использовать очередь с библиотекой многопроцессорности в Python. После выполнения кода ниже (операторы печати работают), но процессы не завершаются после того, как я вызываю соединение в очереди, и все еще живы. Как я могу завершить оставшиеся процессы?
Спасибо!
def MultiprocessTest(self):
print "Starting multiprocess."
print "Number of CPUs",multiprocessing.cpu_count()
num_procs = 4
def do_work(message):
print "work",message ,"completed"
def worker():
while True:
item = q.get()
do_work(item)
q.task_done()
q = multiprocessing.JoinableQueue()
for i in range(num_procs):
p = multiprocessing.Process(target=worker)
p.daemon = True
p.start()
source = ['hi','there','how','are','you','doing']
for item in source:
q.put(item)
print "q close"
q.join()
#q.close()
print "Finished everything...."
print "num active children:",multiprocessing.active_children()
Ответы
Ответ 1
попробуйте следующее:
import multiprocessing
num_procs = 4
def do_work(message):
print "work",message ,"completed"
def worker():
for item in iter( q.get, None ):
do_work(item)
q.task_done()
q.task_done()
q = multiprocessing.JoinableQueue()
procs = []
for i in range(num_procs):
procs.append( multiprocessing.Process(target=worker) )
procs[-1].daemon = True
procs[-1].start()
source = ['hi','there','how','are','you','doing']
for item in source:
q.put(item)
q.join()
for p in procs:
q.put( None )
q.join()
for p in procs:
p.join()
print "Finished everything...."
print "num active children:", multiprocessing.active_children()
Ответ 2
Вашим работникам нужен дозорный, чтобы прекратить работу, или они просто сидят на блокирующих чтениях. Обратите внимание, что использование sleep на Q вместо соединения на P позволяет отображать информацию о состоянии и т.д.
Мой предпочтительный шаблон:
def worker(q,nameStr):
print 'Worker %s started' %nameStr
while True:
item = q.get()
if item is None: # detect sentinel
break
print '%s processed %s' % (nameStr,item) # do something useful
q.task_done()
print 'Worker %s Finished' % nameStr
q.task_done()
q = multiprocessing.JoinableQueue()
procs = []
for i in range(num_procs):
nameStr = 'Worker_'+str(i)
p = multiprocessing.Process(target=worker, args=(q,nameStr))
p.daemon = True
p.start()
procs.append(p)
source = ['hi','there','how','are','you','doing']
for item in source:
q.put(item)
for i in range(num_procs):
q.put(None) # send termination sentinel, one for each process
while not q.empty(): # wait for processing to finish
sleep(1) # manage timeouts and status updates etc.
Ответ 3
Вы должны очистить очередь перед присоединением к процессу, но q.empty() ненадежно.
Лучший способ очистить очередь - подсчитать количество успешных get или loop до тех пор, пока вы не получите дозорное значение, точно так же, как сокет с надежной сетью.
Ответ 4
Код ниже может не очень уместно, но я отправляю его для комментариев/отзывов, чтобы мы могли учиться вместе. Спасибо!
import multiprocessing
def boss(q,nameStr):
source = range(1024)
for item in source:
q.put(nameStr+' '+str(item))
q.put(None) # send termination sentinel, one for each process
def worker(q,nameStr):
while True:
item = q.get()
if item is None: # detect sentinel
break
print '%s processed %s' % (nameStr,item) # do something useful
q = multiprocessing.Queue()
procs = []
num_procs = 4
for i in range(num_procs):
nameStr = 'ID_'+str(i)
p = multiprocessing.Process(target=worker, args=(q,nameStr))
procs.append(p)
p = multiprocessing.Process(target=boss, args=(q,nameStr))
procs.append(p)
for j in procs:
j.start()
for j in procs:
j.join()
Ответ 5
Вот метод без досмотра для относительно простого случая, когда вы ставите ряд задач на JoinableQueue
, затем запускаете рабочие процессы, которые потребляют задачи и выходят, как только они читают очередь "сухая". Хитрость заключается в использовании JoinableQueue.get_nowait()
вместо get()
. get_nowait()
, как следует из названия, пытается получить значение из очереди неблокирующим образом, и если ничего не получится, возникает исключение queue.Empty
. Рабочий обрабатывает это исключение, выходя из него.
Простейший код, иллюстрирующий принцип:
import multiprocessing as mp
from queue import Empty
def worker(q):
while True:
try:
work = q.get_nowait()
# ... do something with `work`
q.task_done()
except Empty:
break # completely done
# main
worknum = 4
jq = mp.JoinableQueue()
# fill up the task queue
# let assume `tasks` contains some sort of data
# that your workers know how to process
for task in tasks:
jq.put(task)
procs = [ mp.Process(target=worker, args=(jq,)) for _ in range(worknum) ]
for p in procs:
p.start()
for p in procs:
p.join()
Преимущество состоит в том, что вам не нужно помещать "ядовитые таблетки" в очередь, чтобы код был немного короче.
ВАЖНО: в более сложных ситуациях, когда производители и потребители используют одну и ту же очередь "чередующимися" способами, и работникам, возможно, придется ждать новых задач, подход "ядовитая таблетка" должен быть использован. Мое предложение выше - это простые случаи, когда работники "знают", что если очередь задач пуста, то больше нет точки.