Как заставить рабочие потоки прекратить работу после завершения работы в многопоточном шаблоне производителя-потребителя?

Я пытаюсь реализовать многопоточность шаблона производителя-потребителя, используя Queue.Queue в Python 2.7. Я пытаюсь выяснить, как заставить потребителей, т.е. Рабочие потоки, остановиться, как только будет завершена вся необходимая работа.

См. Второй комментарий Мартина Джеймса к этому ответу: qaru.site/info/5751624/...

Отправьте задание "Я закончен", инструктируя потоки пулов завершить работу. Любой поток, который получает такую задачу, требует его, а затем совершает самоубийство.

Но это не работает для меня. См. Следующий код, например.

import Queue
import threading
import time

def worker(n, q):
    # n - Worker ID
    # q - Queue from which to receive data
    while True:
        data = q.get()
        print 'worker', n, 'got', data
        time.sleep(1)  # Simulate noticeable data processing time
        q.task_done()
        if data == -1: # -1 is used to indicate that the worker should stop
            # Requeue the exit indicator.
            q.put(-1)
            # Commit suicide.
            print 'worker', n, 'is exiting'
            break

def master():
    # master() sends data to worker() via q.
    q = Queue.Queue()

    # Create 3 workers.
    for i in range(3):
        t = threading.Thread(target=worker, args=(i, q))
        t.start()

    # Send 10 items to work on.
    for i in range(10):
        q.put(i)
        time.sleep(0.5)

    # Send an exit indicator for all threads to consume.
    q.put(-1)

    print 'waiting for workers to finish ...'
    q.join()
    print 'done'

master()

Эта программа зависает после того, как все три сотрудника прочитали индикатор выхода, т. -1 из очереди, потому что каждый рабочий запрашивает -1 перед выходом, поэтому очередь никогда не становится пустой, а q.join() никогда не возвращается.

Я придумал следующее, но уродливое решение, в котором я посылаю индикатор -1 для каждого рабочего через очередь, чтобы каждый работник мог видеть это и совершать самоубийство. Но тот факт, что я должен отправить индикатор выхода для каждого работника, кажется немного уродливым.

import Queue
import threading
import time

def worker(n, q):
    # n - Worker ID
    # q - Queue from which to receive data
    while True:
        data = q.get()
        print 'worker', n, 'got', data
        time.sleep(1)  # Simulate noticeable data processing time
        q.task_done()
        if data == -1: # -1 is used to indicate that the worker should stop
            print 'worker', n, 'is exiting'
            break

def master():
    # master() sends data to worker() via q.
    q = Queue.Queue()

    # Create 3 workers.
    for i in range(3):
        t = threading.Thread(target=worker, args=(i, q))
        t.start()

    # Send 10 items to work on.
    for i in range(10):
        q.put(i)
        time.sleep(0.5)

    # Send one stop indicator for each worker.
    for i in range(3):
        q.put(-1)

    print 'waiting for workers to finish ...'
    q.join()
    print 'done'

master()

У меня есть два вопроса.

  1. Может ли работать метод отправки одного индикатора выхода для всех потоков (как объясняется во втором комментарии qaru.site/info/5751624/... от Martin James)?
  2. Если ответ на предыдущий вопрос "Нет", есть ли способ решить проблему таким образом, что мне не нужно отправлять отдельный индикатор выхода для каждого рабочего потока?

Ответы

Ответ 1

Не называйте это специальным случаем для задачи.

Вместо этого используйте событие, с неблокирующей реализацией для ваших работников.

stopping = threading.Event()

def worker(n, q, timeout=1):
    # run until the master thread indicates we're done
    while not stopping.is_set():
        try:
            # don't block indefinitely so we can return to the top
            # of the loop and check the stopping event
            data = q.get(True, timeout)
        # raised by q.get if we reach the timeout on an empty queue
        except queue.Empty:
            continue
        q.task_done()

def master():
    ...

    print 'waiting for workers to finish'
    q.join()
    stopping.set()
    print 'done'

Ответ 2

Может ли работать метод отправки одного индикатора выхода для всех потоков (как объясняется во втором комментарии fooobar.com/questions/5751624/... от Martin James)?

Поскольку вы заметили, что это не сработает, распространение сообщения приведет к тому, что последний поток обновит очередь еще одним элементом, и поскольку вы ждете очереди, которая никогда не будет пустой, а не с кодом, который у вас есть.

Если ответ на предыдущий вопрос "Нет", есть ли способ решить проблему таким образом, что мне не нужно отправлять отдельный индикатор выхода для каждого рабочего потока?

Вы можете join к потокам вместо очереди:

def worker(n, q):
    # n - Worker ID
    # q - Queue from which to receive data
    while True:
        data = q.get()
        print 'worker', n, 'got', data
        time.sleep(1)  # Simulate noticeable data processing time
        q.task_done()
        if data == -1: # -1 is used to indicate that the worker should stop
            # Requeue the exit indicator.
            q.put(-1)
            # Commit suicide.
            print 'worker', n, 'is exiting'
            break

def master():
    # master() sends data to worker() via q.
    q = Queue.Queue()

    # Create 3 workers.
    threads = [threading.Thread(target=worker, args=(i, q)) for i in range(3)]
    for t in threads:
        threads.start()
    # Send 10 items to work on.
    for i in range(10):
        q.put(i)
        time.sleep(0.5)

    # Send an exit indicator for all threads to consume.
    q.put(-1)

    print 'waiting for workers to finish ...'
    for t in threads:
        t.join()
    print 'done'

master()

Поскольку объяснение метода Queue объясняет, get метод get повысит выполнение после его пустого, поэтому, если вы уже знаете данные для обработки, вы можете заполнить очередь и затем спамить потоки:

import Queue
import threading
import time

def worker(n, q):
    # n - Worker ID
    # q - Queue from which to receive data
    while True:
        try:
            data = q.get(block=False, timeout=1)
            print 'worker', n, 'got', data
            time.sleep(1)  # Simulate noticeable data processing time
            q.task_done()
        except Queue.Empty:
            break


def master():
    # master() sends data to worker() via q.
    q = Queue.Queue()

    # Send 10 items to work on.
    for i in range(10):
        q.put(i)

    # Create 3 workers.
    for i in range(3):
        t = threading.Thread(target=worker, args=(i, q))
        t.start()

    print 'waiting for workers to finish ...'
    q.join()
    print 'done'

master()

Здесь у вас живой пример

Ответ 3

Просто для полноты: вы также можете поставить в очередь сигнал остановки, который есть - (количество потоков). Каждый поток затем может увеличивать его на единицу и снова ставить в очередь, только если сигнал останова равен! = 0.

    if data < 0: # negative numbers are used to indicate that the worker should stop
        if data < -1:
            q.put(data + 1)
        # Commit suicide.
        print 'worker', n, 'is exiting'
        break

Но я лично поеду с ответом Travis Mehlinger или Daniel Sanchez.

Ответ 4

В дополнение к превосходному ответу @DanielSanchez, я предлагаю на самом деле полагаться на аналогичный механизм, как Java CountDownLatch.

Сущность,

  • вы создаете latch которая будет открываться только после того, как какой-то счетчик опустится,
  • когда защелка открывается, ожидающий ее поток будет иметь право продолжить выполнение.

  • Я сделал чрезмерно простой пример, проверьте здесь, например, класс такой защелки:

    import threading
    import Queue
    import time
    
    WORKER_COUNT = 3
    latch = threading.Condition()
    count = 3
    
    def wait():
        latch.acquire()
        while count > 0:
            latch.wait()
        latch.release()
    
    def count_down():
        global count
        latch.acquire()
        count -= 1
        if count <= 0:
            latch.notify_all()
        latch.release()
    
    def worker(n, q):
        # n - Worker ID
        # q - Queue from which to receive data
        while True:
            data = q.get()
            print 'worker', n, 'got', data
            time.sleep(1)  # Simulate noticeable data processing time
            q.task_done()
            if data == -1: # -1 is used to indicate that the worker should stop
                # Requeue the exit indicator.
                q.put(-1)
                # Commit suicide.
                count_down()
                print 'worker', n, 'is exiting'
                break
    
    # master() sends data to worker() via q.  
    
    def master():
        q = Queue.Queue()
    
        # Create 3 workers.
        for i in range(WORKER_COUNT):
            t = threading.Thread(target=worker, args=(i, q))
            t.start()
    
        # Send 10 items to work on.
        for i in range(10):
            q.put(i)
            time.sleep(0.5)
    
        # Send an exit indicator for all threads to consume.
        q.put(-1)
        wait()
        print 'done'
    
    master()