Максимальный предел максимальной длины очереди - 32767

Я пытаюсь написать программу Python 2.6 (OSX) с использованием многопроцессорности, и я хочу заполнить очередь более чем по умолчанию 32767 элементов.

from multiprocessing import Queue
Queue(2**15) # raises OSError

Queue(32767) работает отлично, но любое большее число (например, Queue(32768)) терпит неудачу с OSError: [Errno 22] Invalid argument

Есть ли обходной путь для этой проблемы?

Ответы

Ответ 1

Один из подходов - обернуть ваш multiprocessing.Queue специальным классом (только на стороне производителя или прозрачно с точки зрения потребителя). Используя это, вы будете помещать в очередь элементы, которые будут отправлены в объект Queue, который вы обертываете, и загружаете вещи из локальной очереди (объект Python list()) в multiprocess.Queue по мере того, как пространство становится доступным, с обработкой исключений для дросселирования при заполнении Queue.

Это, вероятно, самый простой подход, поскольку он должен иметь минимальное влияние на остальную часть вашего кода. Пользовательский класс должен вести себя точно так же, как в очереди, скрывая базовую multiprocessing.Queue за вашей абстракцией.

(Один из подходов может заключаться в том, чтобы ваш продюсер использовал потоки, один поток для управления отправкой из потока Queue в ваш multiprocessing.Queue и любые другие потоки, на самом деле просто загружающие потоки Queue).

Ответ 2

Я уже ответил на исходный вопрос, но мне хочется добавить, что Redis списки достаточно надежны и поддержка модуля Python для они чрезвычайно просты в использовании для реализации объекта, подобного Queue. Они имеют преимущество, позволяя масштабировать несколько узлов (по сети), а также только несколько процессов.

В основном, чтобы использовать те, которые вы только выбрали ключом (строкой) для вашего имени очереди, ваши продюсеры вставляют в него и заставляют ваших рабочих (потребителей задач) блокировать всплывающие окна с этого ключа.

Команды Redis BLPOP и BRPOP все берут список ключей (списки/очереди) и необязательное значение таймаута. Они возвращают кортеж (ключ, значение) или None (при тайм-ауте). Таким образом, вы можете легко написать систему, управляемую событиями, которая очень похожа на привычную структуру select() (но на гораздо более высоком уровне). Единственное, что вам нужно посмотреть - это пропустить ключи и недопустимые типы ключей (конечно, просто оберните операции с обработчиками исключений). (Если какое-либо другое приложение останавливается на вашем общем сервере Redis, удаляя ключи или заменяя ключи, которые вы использовали в качестве очередей со строками/целыми или другими типами значений... ну, у вас другая проблема в этой точке).:)

Другим преимуществом этой модели является то, что Redis сохраняет свои данные на диске. Таким образом, ваша рабочая очередь может пережить перезагрузку системы, если вы решили разрешить ее.

(Конечно, вы могли бы реализовать простую Queue в виде таблицы в SQLlite или любой другой системе SQL, если вы действительно этого хотели, просто используя какой-то индекс автоматического инкремента для последовательности и столбец для отметки каждого элемента, будучи "сделан" (потребляется), но это требует немного большей сложности, чем использование того, что Redis дает вам "из коробки" ).

Ответ 4

Работа для меня на MacOSX

>>> import Queue
>>> Queue.Queue(30000000)
<Queue.Queue instance at 0x1006035f0>