Queue vs JoinableQueue в Python
В Python при использовании многопроцессорного модуля есть 2 вида очередей:
В чем разница между ними?
Queue
from multiprocessing import Queue
q = Queue()
q.put(item) # Put an item on the queue
item = q.get() # Get an item from the queue
JoinableQueue
from multiprocessing import JoinableQueue
q = JoinableQueue()
q.task_done() # Signal task completion
q.join() # Wait for completion
Ответы
Ответ 1
JoinableQueue
имеет методы join()
и task_done()
, которые Queue
hasn ' т.
многопроцессорность класса .Queue([maxsize])
Возвращает очередь процессов, реализованную с использованием канала и нескольких блокировок/семафоров. Когда процесс сначала помещает элемент в очередь, запускается поток фидера, который переносит объекты из буфера в канал.
Обычные Queue.Empty и Queue.Full исключения из стандартного модуля Queues для очереди увеличены до тайм-аутов сигнала.
Queue реализует все методы Queue.Queue, за исключением task_done() и join().
многопроцессорный класс .JoinableQueue([maxsize])
JoinableQueue, подкласс Queue, представляет собой очередь, которая дополнительно имеет методы task_done() и join().
task_done()
Укажите, что ранее заданная задача завершена. Используется потоками потребителей в очереди. Для каждого get(), используемого для извлечения задачи, последующий вызов task_done() сообщает очереди, что обработка задачи завершена.
Если соединение join() блокируется в настоящий момент, оно будет возобновлено, когда все элементы будут обработаны (это означает, что для каждого элемента, который был помещен() в очередь, был получен вызов task_done()).
Повышает значение ValueError, если вызвано больше раз, чем были помещены в очередь.
Join()
Блокировать до тех пор, пока все элементы в очереди не будут получены и обработаны.
Количество незавершенных задач увеличивается, когда элемент добавляется в очередь. Счетчик прекращается всякий раз, когда потребительский поток вызывает task_done(), чтобы указать, что элемент был восстановлен, и все работа над ним завершена. Когда количество незавершенных задач падает до нуля, join() разблокирует.
Если вы используете JoinableQueue
, тогда вы должны вызывать JoinableQueue.task_done()
для каждой задачи, удаленной из очереди, или семафор, используемый для подсчета количества незавершенных задач, может в конечном итоге переполнить, создавая исключение.
Ответ 2
Исходя из документации, трудно быть уверенным, что Queue
на самом деле пуста. С JoinableQueue
вы можете дождаться освобождения очереди, вызвав q.join()
. В тех случаях, когда вы хотите выполнить работу в отдельных пакетах, когда вы делаете что-то дискретное в конце каждого пакета, это может быть полезно.
Например, возможно, вы обрабатываете 1000 элементов за раз через очередь, а затем отправляете push-уведомление пользователю, что вы завершили другой пакет. Это было бы сложно реализовать с обычной Queue
.
Это может выглядеть примерно так:
import multiprocessing as mp
BATCH_SIZE = 1000
STOP_VALUE = 'STOP'
def consume(q):
for item in iter(q.get, STOP_VALUE):
try:
process(item)
# Be very defensive about errors since they can corrupt pipes.
except Exception as e:
logger.error(e)
finally:
q.task_done()
q = mp.JoinableQueue()
with mp.Pool() as pool:
# Pull items off queue as fast as we can whenever they're ready.
for _ in range(mp.cpu_count()):
pool.apply_async(consume, q)
for i in range(0, len(URLS), BATCH_SIZE):
# Put 'BATCH_SIZE' items in queue asynchronously.
pool.map_async(expensive_func, URLS[i:i+BATCH_SIZE], callback=q.put)
# Wait for the queue to empty.
q.join()
notify_users()
# Stop the consumers so we can exit cleanly.
for _ in range(mp.cpu_count()):
q.put(STOP_VALUE)
NB: я на самом деле не запускал этот код. Если вы вытаскиваете предметы из очереди быстрее, чем ставите их, вы можете закончить раньше. В этом случае этот код отправляет обновление по крайней мере каждые 1000 пунктов, а может и чаще. Для обновлений прогресса это, вероятно, хорошо. Если важно, чтобы было ровно 1000, вы можете использовать mp.Value('i', 0)
и проверять, что оно mp.Value('i', 0)
1000, когда освобождается ваше join
.