Сбрасывание многопроцессорности.

Я хочу сбросить multiprocessing.Queue в список. Для этой задачи я написал следующую функцию:

import Queue

def dump_queue(queue):
    """
    Empties all pending items in a queue and returns them in a list.
    """
    result = []

    # START DEBUG CODE
    initial_size = queue.qsize()
    print("Queue has %s items initially." % initial_size)
    # END DEBUG CODE

    while True:
        try:
            thing = queue.get(block=False)
            result.append(thing)
        except Queue.Empty:

            # START DEBUG CODE
            current_size = queue.qsize()
            total_size = current_size + len(result)
            print("Dumping complete:")
            if current_size == initial_size:
                print("No items were added to the queue.")
            else:
                print("%s items were added to the queue." % \
                      (total_size - initial_size))
            print("Extracted %s items from the queue, queue has %s items \
            left" % (len(result), current_size))
            # END DEBUG CODE

            return result

Но по какой-то причине это не работает.

Обратите внимание на следующий сеанс оболочки:

>>> import multiprocessing
>>> q = multiprocessing.Queue()
>>> for i in range(100):
...     q.put([range(200) for j in range(100)])
... 
>>> q.qsize()
100
>>> l=dump_queue(q)
Queue has 100 items initially.
Dumping complete:
0 items were added to the queue.
Extracted 1 items from the queue, queue has 99 items left
>>> l=dump_queue(q)
Queue has 99 items initially.
Dumping complete:
0 items were added to the queue.
Extracted 3 items from the queue, queue has 96 items left
>>> l=dump_queue(q)
Queue has 96 items initially.
Dumping complete:
0 items were added to the queue.
Extracted 1 items from the queue, queue has 95 items left
>>> 

Что здесь происходит? Почему не все предметы сбрасываются?

Ответы

Ответ 1

Попробуйте следующее:

import Queue
import time

def dump_queue(queue):
    """
    Empties all pending items in a queue and returns them in a list.
    """
    result = []

    for i in iter(queue.get, 'STOP'):
        result.append(i)
    time.sleep(.1)
    return result

import multiprocessing
q = multiprocessing.Queue()
for i in range(100):
    q.put([range(200) for j in range(100)])
q.put('STOP')
l=dump_queue(q)
print len(l)

Многопроцессорные очереди имеют внутренний буфер, который имеет поток фидера, который вытягивает работу с буфера и сбрасывает его в трубу. Если не все объекты были сброшены, я мог бы увидеть случай, когда Empty был поднят досрочно. Использование дозорного устройства для указания конца очереди является безопасным (и надежным). Кроме того, использование итерации iter (get, sentinel) лучше, чем полагаться на Empty.

Мне не нравится, что он может поднимать пустой из-за времени очистки (я добавил time.sleep(.1), чтобы разрешить контекстный переключатель в поток фидера, вам может и не понадобиться, он работает без него - он привычка выпускать GIL).

Ответ 2

В некоторых ситуациях мы уже вычислили все, и хотим просто конвертировать очередь.

shared_queue = Queue()
shared_queue_list = []
...
join() #All process are joined
while shared_queue.qsize() != 0:
    shared_queue_list.append(shared_queue.get())

Теперь shared_queue_list имеет результаты, преобразованные в список.