Многопроцессорство в трубопроводе выполнено правильно
Я хотел бы знать, как многопроцессорная обработка выполняется правильно. Предполагая, что у меня есть список [1,2,3,4,5]
, сгенерированный функцией f1
, который записывается в Queue
(левый зеленый круг). Теперь я запускаю два процесса, вытягивающих из этой очереди (путем выполнения f2
в процессах). Они обрабатывают данные, скажем: удваивая значение и записывая его во вторую очередь. Теперь функция f3
считывает эти данные и распечатывает их.
![layout of the data flow]()
Внутри функций есть своего рода цикл, который пытается прочитать из очереди навсегда. Как остановить этот процесс?
Идея 1
f1
отправляет не только список, но также объект None
или объект custon, class PipelineTerminator: pass
или некоторые из них, которые просто распространяются полностью вниз. f3
теперь ждет, когда None
появится, когда он там, он выйдет из цикла. Проблема: возможно, что один из двух f2
считывает и передает None
, а другой - все еще обрабатывает число. Затем последнее значение теряется.
Идея 2
f3
составляет f1
. Таким образом, функция f1
генерирует данные и трубы, порождает процессы с помощью f2
и передает все данные. После нереста и кормления он слушает вторую трубу, просто подсчитывая и обрабатывая полученные объекты. Поскольку он знает, сколько данных подано, он может завершить процессы, выполняемые f2
. Но если целью является создание конвейера обработки, различные этапы должны быть разделяемыми. Таким образом, f1
, f2
и f3
- это разные элементы конвейера, а дорогостоящие шаги выполняются параллельно.
Идея 3
![pipeline idea 3]()
Каждая часть конвейера является функцией, эта функция генерирует процессы, которые ей нравятся, и отвечает за их управление. Он знает, сколько данных и сколько данных было возвращено (возможно, с yield
). Поэтому безопасно распространять объект None
.
setup child processes
execute thread one and two and wait until both finished
thread 1:
while True:
pull from input queue
if None: break and set finished_flag
else: push to queue1 and increment counter1
thread 2:
while True:
pull from queue2
increment counter2
yield result
if counter1 == counter2 and finished_flag: break
when both threads finished: kill process pool and return.
(Вместо использования потоков, возможно, можно подумать о более разумном решении.)
Итак...
Я реализовал решение, следующее за идеей 2, подачу и ожидание результатов, но это был не конвейер с независимыми функциями, подключенными друг к другу. Он работал над задачей, которую мне пришлось управлять, но ее было сложно поддерживать.
Теперь я хотел бы услышать от вас, как вы реализуете конвейеры (легко в одном процессе с функциями генератора и т.д., но с несколькими процессами?) и обычно управляете ими.
Ответы
Ответ 1
Что было бы неправильно с использованием идеи 1, но с каждым рабочим процессом (f2), когда он был создан пользовательский объект с его идентификатором? Затем f3 просто прекратит работу этого работника, пока не останется рабочий процесс.
Кроме того, новый в Python 3.2 представляет собой пакет concurrent.futures в стандартной библиотеке, который должен делать то, что вы пытаетесь "правильно" (tm) -
http://docs.python.org/dev/library/concurrent.futures.html
Возможно, можно найти backport of concurrent.futures для серии Python 2.x.
Ответ 2
С MPipe модуль, просто выполните следующее:
from mpipe import OrderedStage, Pipeline
def f1(value):
return value * 2
def f2(value):
print(value)
s1 = OrderedStage(f1, size=2)
s2 = OrderedStage(f2)
p = Pipeline(s1.link(s2))
for task in 1, 2, 3, 4, 5, None:
p.put(task)
В приведенном выше примере выполняется 4 процесса:
- два для первого этапа (функция f1)
- один для второго этапа (функция f2)
- и еще один для основной программы, которая передает конвейер.
кулинарная книга MPipe предлагает некоторое объяснение того, как процессы закрываются внутри, используя None
в качестве последней задачи.
Чтобы запустить код, установите MPipe:
virtualenv venv
venv/bin/pip install mpipe
venv/bin/python prog.py
Вывод:
2
4
6
8
10
Ответ 3
Для идеи 1, как насчет:
import multiprocessing as mp
sentinel=None
def f2(inq,outq):
while True:
val=inq.get()
if val is sentinel:
break
outq.put(val*2)
def f3(outq):
while True:
val=outq.get()
if val is sentinel:
break
print(val)
def f1():
num_workers=2
inq=mp.Queue()
outq=mp.Queue()
for i in range(5):
inq.put(i)
for i in range(num_workers):
inq.put(sentinel)
workers=[mp.Process(target=f2,args=(inq,outq)) for i in range(2)]
printer=mp.Process(target=f3,args=(outq,))
for w in workers:
w.start()
printer.start()
for w in workers:
w.join()
outq.put(sentinel)
printer.join()
if __name__=='__main__':
f1()
Единственное отличие от описания Идеи 1 состоит в том, что f2
выходит из while-loop
, когда получает дозорный (таким образом, завершая себя). f1
блокируется до тех пор, пока работники не будут выполнены (используя w.join()
), а затем отправит f3
дозорный сигнал (сигнализируя, что он вырвался из его while-loop
).
Ответ 4
Самый простой способ сделать это с помощью семафоров.
F1
F1 заполняет "очередь" данными, которые вы хотите обработать. Завершите конец этого нажатия, вы поместите n 'Stop' в свою очередь. n = 2 для вашего примера, но обычно количество задействованных работников.
Код будет выглядеть так:
for n in no_of_processes:
tasks.put('Stop')
F2
F2 вытаскивает из предоставленной очереди команду get
. Элемент берется из очереди и удаляется в очереди. Теперь вы можете поместить pop в цикл, обращая внимание на сигнал останова:
for elem in iter(tasks.get, 'STOP'):
do something
F3
Это немного сложно. Вы можете сгенерировать семафор в F2, который действует как сигнал к F3. Но вы не знаете, когда поступит этот сигнал, и вы можете потерять данные. Однако F3 вытаскивает данные так же, как F2, и вы можете вставить это в try... except
-statement.
queue.get
вызывает queue.Empty
, когда в очереди нет элементов. Таким образом, ваше нажатие F3 будет выглядеть так:
while control:
try:
results.get()
except queue.Empty:
control = False
С tasks
и results
очереди. Поэтому вам не нужно ничего, что еще не включено в Python.
Ответ 5
Я использую concurent.futures
и три пула, которые соединяются вместе через future.add_done_callback
. Затем я жду завершения всего процесса, вызвав shutdown
в каждом пуле.
from concurrent.futures import ProcessPoolExecutor
import time
import random
def worker1(arg):
time.sleep(random.random())
return arg
def pipe12(future):
pool2.submit(worker2, future.result()).add_done_callback(pipe23)
def worker2(arg):
time.sleep(random.random())
return arg
def pipe23(future):
pool3.submit(worker3, future.result()).add_done_callback(spout)
def worker3(arg):
time.sleep(random.random())
return arg
def spout(future):
print(future.result())
if __name__ == "__main__":
__spec__ = None # Fix multiprocessing in Spyder IPython
pool1 = ProcessPoolExecutor(2)
pool2 = ProcessPoolExecutor(2)
pool3 = ProcessPoolExecutor(2)
for i in range(10):
pool1.submit(worker1, i).add_done_callback(pipe12)
pool1.shutdown()
pool2.shutdown()
pool3.shutdown()