Совместное использование очереди результатов между несколькими процессами
Документация для модуля multiprocessing
показывает, как передать очередь в процесс, запущенный с помощью multiprocessing.Process
. Но как я могу обмениваться очередью с асинхронными рабочими процессами, начинающимися с apply_async
? Мне не нужно динамическое присоединение или что-то еще, просто чтобы рабочие (неоднократно) сообщали о своих результатах на базу.
import multiprocessing
def worker(name, que):
que.put("%d is done" % name)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3)
q = multiprocessing.Queue()
workers = pool.apply_async(worker, (33, q))
Это не удается: RuntimeError: Queue objects should only be shared between processes through inheritance
.
Я понимаю, что это значит, и я понимаю, что совет наследует, а не требует травления/рассыпания (и всех специальных ограничений Windows). Но как я могу пройти очередь так, как это работает? Я не могу найти пример, и я попробовал несколько альтернатив, которые потерпели неудачу разными способами. Помогите пожалуйста?
Ответы
Ответ 1
Попробуйте использовать multiprocessing.Manager для управления своей очередью и также сделать ее доступной для разных работников.
import multiprocessing
def worker(name, que):
que.put("%d is done" % name)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3)
m = multiprocessing.Manager()
q = m.Queue()
workers = pool.apply_async(worker, (33, q))
Ответ 2
multiprocessing.Pool
уже имеет общую очередь результатов, нет необходимости дополнительно привлекать Manager.Queue
. Manager.Queue
- это queue.Queue
(многопоточность-очередь) под капотом, расположенная на отдельном серверном процессе и доступная через прокси. Это добавляет дополнительные издержки по сравнению с внутренней очередью пула. В отличие от собственной обработки результатов Manager.Queue
, результаты в Manager.Queue
также не гарантируются.
Рабочие процессы не запускаются с помощью .apply_async()
, это уже происходит при .apply_async()
экземпляра Pool
. То, что запускается при вызове pool.apply_async()
является новой "работой". Рабочие процессы пула запускают multiprocessing.pool.worker
-function под капотом. Эта функция заботится об обработке новых "задач", переданных через внутренний пул Pool._inqueue
и об отправке результатов родителю через Pool._outqueue
. func
будет выполнена в multiprocessing.pool.worker
. func
только должен что-то return
и результат будет автоматически отправлен обратно родителю.
.apply_async()
немедленно (асинхронно) возвращает объект AsyncResult
(псевдоним ApplyResult
). Вам нужно вызвать .get()
(блокирует) для этого объекта, чтобы получить фактический результат. Другой вариант - зарегистрировать функцию обратного вызова, которая запускается, как только результат становится готовым.
from multiprocessing import Pool
def busy_foo(i):
"""Dummy function simulating cpu-bound work."""
for _ in range(int(10e6)): # do stuff
pass
return i
if __name__ == '__main__':
with Pool(4) as pool:
print(pool._outqueue) # DEMO
results = [pool.apply_async(busy_foo, (i,)) for i in range(10)]
# '.apply_async()' immediately returns AsyncResult (ApplyResult) object
print(results[0]) # DEMO
results = [res.get() for res in results]
print(f'result: {results}')
Пример вывода:
<multiprocessing.queues.SimpleQueue object at 0x7fa124fd67f0>
<multiprocessing.pool.ApplyResult object at 0x7fa12586da20>
result: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Примечание. Указание timeout
-parameter для .get()
не остановит фактическую обработку задачи внутри рабочего, а только разблокирует ожидающего родителя, вызвав multiprocessing.TimeoutError
.