Как использовать многопроцессорную очередь в Python?
У меня много проблем, пытаясь понять, как многопроцессорная очередь работает на python и как ее реализовать. Допустим, у меня есть два модуля python, которые обращаются к данным из общего файла, позволяют называть эти два модуля писателем и читателем. Мой план состоит в том, чтобы как читатель, так и писатель отправлял запросы в две отдельные очереди многопроцессорности, а затем третий процесс выдавал эти запросы в цикле и выполнял их как таковые.
Моя основная проблема заключается в том, что я действительно не знаю, как правильно реализовать multiprocessing.queue, вы не можете реально создать объект для каждого процесса, так как они будут отдельными очередями, как вы убедитесь, что все процессы связаны с общим очереди (или в этом случае очереди)
Ответы
Ответ 1
Моя главная проблема в том, что я действительно не знаю, как правильно реализовать multiprocessing.queue, вы не можете создать экземпляр объекта для каждого процесса, так как они будут отдельными очередями, как вы убедитесь, что все процессы относятся к общей очереди (или в этом случае очереди)
Это простой пример того, как читатель и писатель разделяют одну очередь... Автор посылает читателю кучу целых чисел; когда у автора заканчиваются цифры, он отправляет сообщение "ГОТОВО", что позволяет читателю выйти из цикла чтения.
from multiprocessing import Process, Queue
import time
import sys
def reader_proc(queue):
## Read from the queue; this will be spawned as a separate Process
while True:
msg = queue.get() # Read from the queue and do nothing
if (msg == 'DONE'):
break
def writer(count, queue):
## Write to the queue
for ii in range(0, count):
queue.put(ii) # Write 'count' numbers into the queue
queue.put('DONE')
if __name__=='__main__':
pqueue = Queue() # writer() writes to pqueue from _this_ process
for count in [10**4, 10**5, 10**6]:
### reader_proc() reads from pqueue as a separate process
reader_p = Process(target=reader_proc, args=((pqueue),))
reader_p.daemon = True
reader_p.start() # Launch reader_proc() as a separate python process
_start = time.time()
writer(count, pqueue) # Send a lot of stuff to reader()
reader_p.join() # Wait for the reader to finish
print("Sending {0} numbers to Queue() took {1} seconds".format(count,
(time.time() - _start)))
Ответ 2
в " from queue import Queue
" нет модуля, называемого queue
, вместо этого следует использовать multiprocessing
. Поэтому он должен выглядеть как " from multiprocessing import Queue
"
Ответ 3
Здесь совершенно простое использование multiprocessing.Queue
и multiprocessing.Process
которое позволяет вызывающим сторонам отправлять "событие" плюс аргументы в отдельный процесс, который отправляет событие методу "do_" в процессе. (Python 3. 4+)
import multiprocessing as mp
import collections
Msg = collections.namedtuple('Msg', ['event', 'args'])
class BaseProcess(mp.Process):
"""A process backed by an internal queue for simple one-way message passing.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.queue = mp.Queue()
def send(self, event, *args):
"""Puts the event and args as a 'Msg' on the queue
"""
msg = Msg(event, args)
self.queue.put(msg)
def dispatch(self, msg):
event, args = msg
handler = getattr(self, "do_%s" % event, None)
if not handler:
raise NotImplementedError("Process has no handler for [%s]" % event)
handler(*args)
def run(self):
while True:
msg = self.queue.get()
self.dispatch(msg)
Использование:
class MyProcess(BaseProcess):
def do_helloworld(self, arg1, arg2):
print(arg1, arg2)
if __name__ == "__main__":
process = MyProcess()
process.start()
process.send('helloworld', 'hello', 'world')
send
происходит в родительском процессе, do_*
происходит в дочернем процессе.
Я исключил любую обработку исключений, которая явно прервала бы цикл выполнения и вышла из дочернего процесса. Вы также можете настроить его, переопределив run
чтобы контролировать блокировку или что-то еще.
Это действительно полезно только в ситуациях, когда у вас есть один рабочий процесс, но я думаю, что это улокальный ответ на этот вопрос, чтобы продемонстрировать общий сценарий с чуть большей объектно-ориентированной