Как заставить рабочие потоки прекратить работу после завершения работы в многопоточном шаблоне производителя-потребителя?
Я пытаюсь реализовать многопоточность шаблона производителя-потребителя, используя Queue.Queue в Python 2.7. Я пытаюсь выяснить, как заставить потребителей, т.е. Рабочие потоки, остановиться, как только будет завершена вся необходимая работа.
См. Второй комментарий Мартина Джеймса к этому ответу: qaru.site/info/5751624/...
Отправьте задание "Я закончен", инструктируя потоки пулов завершить работу. Любой поток, который получает такую задачу, требует его, а затем совершает самоубийство.
Но это не работает для меня. См. Следующий код, например.
import Queue
import threading
import time
def worker(n, q):
# n - Worker ID
# q - Queue from which to receive data
while True:
data = q.get()
print 'worker', n, 'got', data
time.sleep(1) # Simulate noticeable data processing time
q.task_done()
if data == -1: # -1 is used to indicate that the worker should stop
# Requeue the exit indicator.
q.put(-1)
# Commit suicide.
print 'worker', n, 'is exiting'
break
def master():
# master() sends data to worker() via q.
q = Queue.Queue()
# Create 3 workers.
for i in range(3):
t = threading.Thread(target=worker, args=(i, q))
t.start()
# Send 10 items to work on.
for i in range(10):
q.put(i)
time.sleep(0.5)
# Send an exit indicator for all threads to consume.
q.put(-1)
print 'waiting for workers to finish ...'
q.join()
print 'done'
master()
Эта программа зависает после того, как все три сотрудника прочитали индикатор выхода, т. -1
из очереди, потому что каждый рабочий запрашивает -1
перед выходом, поэтому очередь никогда не становится пустой, а q.join()
никогда не возвращается.
Я придумал следующее, но уродливое решение, в котором я посылаю индикатор -1
для каждого рабочего через очередь, чтобы каждый работник мог видеть это и совершать самоубийство. Но тот факт, что я должен отправить индикатор выхода для каждого работника, кажется немного уродливым.
import Queue
import threading
import time
def worker(n, q):
# n - Worker ID
# q - Queue from which to receive data
while True:
data = q.get()
print 'worker', n, 'got', data
time.sleep(1) # Simulate noticeable data processing time
q.task_done()
if data == -1: # -1 is used to indicate that the worker should stop
print 'worker', n, 'is exiting'
break
def master():
# master() sends data to worker() via q.
q = Queue.Queue()
# Create 3 workers.
for i in range(3):
t = threading.Thread(target=worker, args=(i, q))
t.start()
# Send 10 items to work on.
for i in range(10):
q.put(i)
time.sleep(0.5)
# Send one stop indicator for each worker.
for i in range(3):
q.put(-1)
print 'waiting for workers to finish ...'
q.join()
print 'done'
master()
У меня есть два вопроса.
- Может ли работать метод отправки одного индикатора выхода для всех потоков (как объясняется во втором комментарии qaru.site/info/5751624/... от Martin James)?
- Если ответ на предыдущий вопрос "Нет", есть ли способ решить проблему таким образом, что мне не нужно отправлять отдельный индикатор выхода для каждого рабочего потока?
Ответы
Ответ 1
Не называйте это специальным случаем для задачи.
Вместо этого используйте событие, с неблокирующей реализацией для ваших работников.
stopping = threading.Event()
def worker(n, q, timeout=1):
# run until the master thread indicates we're done
while not stopping.is_set():
try:
# don't block indefinitely so we can return to the top
# of the loop and check the stopping event
data = q.get(True, timeout)
# raised by q.get if we reach the timeout on an empty queue
except queue.Empty:
continue
q.task_done()
def master():
...
print 'waiting for workers to finish'
q.join()
stopping.set()
print 'done'
Ответ 2
Может ли работать метод отправки одного индикатора выхода для всех потоков (как объясняется во втором комментарии fooobar.com/questions/5751624/... от Martin James)?
Поскольку вы заметили, что это не сработает, распространение сообщения приведет к тому, что последний поток обновит очередь еще одним элементом, и поскольку вы ждете очереди, которая никогда не будет пустой, а не с кодом, который у вас есть.
Если ответ на предыдущий вопрос "Нет", есть ли способ решить проблему таким образом, что мне не нужно отправлять отдельный индикатор выхода для каждого рабочего потока?
Вы можете join
к потокам вместо очереди:
def worker(n, q):
# n - Worker ID
# q - Queue from which to receive data
while True:
data = q.get()
print 'worker', n, 'got', data
time.sleep(1) # Simulate noticeable data processing time
q.task_done()
if data == -1: # -1 is used to indicate that the worker should stop
# Requeue the exit indicator.
q.put(-1)
# Commit suicide.
print 'worker', n, 'is exiting'
break
def master():
# master() sends data to worker() via q.
q = Queue.Queue()
# Create 3 workers.
threads = [threading.Thread(target=worker, args=(i, q)) for i in range(3)]
for t in threads:
threads.start()
# Send 10 items to work on.
for i in range(10):
q.put(i)
time.sleep(0.5)
# Send an exit indicator for all threads to consume.
q.put(-1)
print 'waiting for workers to finish ...'
for t in threads:
t.join()
print 'done'
master()
Поскольку объяснение метода Queue объясняет, get
метод get
повысит выполнение после его пустого, поэтому, если вы уже знаете данные для обработки, вы можете заполнить очередь и затем спамить потоки:
import Queue
import threading
import time
def worker(n, q):
# n - Worker ID
# q - Queue from which to receive data
while True:
try:
data = q.get(block=False, timeout=1)
print 'worker', n, 'got', data
time.sleep(1) # Simulate noticeable data processing time
q.task_done()
except Queue.Empty:
break
def master():
# master() sends data to worker() via q.
q = Queue.Queue()
# Send 10 items to work on.
for i in range(10):
q.put(i)
# Create 3 workers.
for i in range(3):
t = threading.Thread(target=worker, args=(i, q))
t.start()
print 'waiting for workers to finish ...'
q.join()
print 'done'
master()
Здесь у вас живой пример
Ответ 3
Просто для полноты: вы также можете поставить в очередь сигнал остановки, который есть - (количество потоков). Каждый поток затем может увеличивать его на единицу и снова ставить в очередь, только если сигнал останова равен! = 0.
if data < 0: # negative numbers are used to indicate that the worker should stop
if data < -1:
q.put(data + 1)
# Commit suicide.
print 'worker', n, 'is exiting'
break
Но я лично поеду с ответом Travis Mehlinger
или Daniel Sanchez
.
Ответ 4
В дополнение к превосходному ответу @DanielSanchez, я предлагаю на самом деле полагаться на аналогичный механизм, как Java CountDownLatch.
Сущность,
- вы создаете
latch
которая будет открываться только после того, как какой-то счетчик опустится, -
когда защелка открывается, ожидающий ее поток будет иметь право продолжить выполнение.
-
Я сделал чрезмерно простой пример, проверьте здесь, например, класс такой защелки:
import threading
import Queue
import time
WORKER_COUNT = 3
latch = threading.Condition()
count = 3
def wait():
latch.acquire()
while count > 0:
latch.wait()
latch.release()
def count_down():
global count
latch.acquire()
count -= 1
if count <= 0:
latch.notify_all()
latch.release()
def worker(n, q):
# n - Worker ID
# q - Queue from which to receive data
while True:
data = q.get()
print 'worker', n, 'got', data
time.sleep(1) # Simulate noticeable data processing time
q.task_done()
if data == -1: # -1 is used to indicate that the worker should stop
# Requeue the exit indicator.
q.put(-1)
# Commit suicide.
count_down()
print 'worker', n, 'is exiting'
break
# master() sends data to worker() via q.
def master():
q = Queue.Queue()
# Create 3 workers.
for i in range(WORKER_COUNT):
t = threading.Thread(target=worker, args=(i, q))
t.start()
# Send 10 items to work on.
for i in range(10):
q.put(i)
time.sleep(0.5)
# Send an exit indicator for all threads to consume.
q.put(-1)
wait()
print 'done'
master()