Сломанная труба при использовании менеджеров многопроцессорности Python (BaseManager/SyncManager) для обмена очередью с удаленными машинами

В прошлом месяце у нас была постоянная проблема с пакетом многопроцессорности Python 2.6.x, когда мы пытались использовать его для совместного использования очереди между несколькими различными (linux) компьютерами. Я задал этот вопрос непосредственно Джесси Ноллеру, так как мы еще не нашли ничего, что разъясняет проблему в документах StackOverflow, Python, исходном коде или в другом месте в Интернете.

Наша команда инженеров не смогла решить эту проблему, и мы поставили вопрос довольно многим людям в группах пользователей python безрезультатно. Я надеялся, что кто-то может пролить некоторое понимание, так как я чувствую, что мы делаем что-то неправильное, но слишком близко к проблеме, чтобы увидеть, что это такое.

Вот симптом:

Traceback (most recent call last):
  File "/var/django_root/dev/com/brightscope/data/processes/daemons/deferredupdates/servers/queue_server.py", line 65, in get_from_queue
    return queue, queue.get(block=False)
  File "<string>", line 2, in get
  File "/usr/local/lib/python2.6/multiprocessing/managers.py", line 725, in _callmethod
    conn.send((self._id, methodname, args, kwds))
IOError: [Errno 32] Broken pipe

(Я показываю, где наш код вызывает queue.get() для объекта общей очереди, размещенного менеджером, который расширяет SyncManger).

Что связано с проблемой, так это то, что если мы подключаемся к этой общей очереди на одном компьютере (позвольте этому machine A), даже из-за множества параллельных процессов, мы никогда не сталкиваемся с проблемой. Это только когда мы подключаемся к очереди (опять же, используя класс, который расширяет многопроцессорность SyncManager и в настоящее время не добавляет никаких дополнительных функций) с других компьютеров (позвольте называть эти machines B and C) и запускайте большой объем элементов в очередь и из очереди в в то же время, когда мы сталкиваемся с проблемой.

Как будто пакет многопроцессорности python обрабатывает локальные соединения (хотя они все еще используют один и тот же метод соединения manager.connect()) таким образом, который работает от machine A, но когда удаленные соединения выполняются одновременно по меньшей мере с одного machines B or C мы получаем ошибку Broken pipe.

Во всех чтениях моей команды мы решили, что проблема связана с блокировкой. Мы подумали, что мы не должны использовать Queue.Queue, но вместо этого multiprocessing.Queue, но мы переключились и проблема осталась (мы также заметили, что собственная общая очередь SyncManager - это экземпляр Queue.Queue).

Мы вытаскиваем наши волосы о том, как даже отлаживать проблему, так как трудно воспроизвести, но происходит довольно часто (много раз в день, если мы вставляем и .get() загружаем множество элементов из очереди).

Созданный нами метод get_from_queue пытается повторить получение элемента из очереди ~ 10 раз со случайными интервалами сна, но похоже, что если он терпит неудачу один раз, он провалится все десять раз (что заставило меня поверить в это. register() и .connect() в менеджере, возможно, не дают другого подключения сокета к серверу, но я не смог подтвердить это, читая документы или глядя на внутренний исходный код Python).

Может ли кто-нибудь дать представление о том, где мы можем смотреть, или как мы можем отслеживать, что на самом деле происходит?

Как мы можем начать новое соединение в случае сломанного трубопровода с помощью multiprocessing.BaseManager или multiprocessing.SyncManager?

Как мы можем предотвратить поврежденную трубу в первую очередь?

Ответы

Ответ 1

FYI. Если кто-то еще работает по этой же ошибке, после обширного консультирования с Ask Solem и Jesse Noller из команды разработчиков ядра Python, похоже, что это на самом деле ошибка в текущем python 2.6.x(и, возможно, 2.7+ и, возможно, 3.x). Они рассматривают возможные решения, и исправление, вероятно, будет включено в будущую версию Python.

Ответ 2

Я страдал от той же проблемы, даже если подключение к локальному хосту в Python 2.7.1. После дня отладки я нашел причину и обходной путь:

Причина: класс BaseProxy имеет локальное хранилище потока, которое кэширует соединение, которое повторно используется для будущих соединений, вызывая ошибки "сломанной трубы" даже при создании нового менеджера.

Обходной путь: удалите кэшированное соединение перед повторным подключением. Добавьте код в предложение try-exc в строке, которая вызывает исключение, а затем повторите его.

from multiprocessing.managers import BaseProxy

...

if address in BaseProxy._address_to_local:
    del BaseProxy._address_to_local[address][0].connection

address - это имя хоста /ip, используемое для подключения к диспетчеру многопроцессорной обработки. Если вы не указали это явно, обычно это должно быть "localhost"

Ответ 3

Также вы можете попытаться поймать исключение в дочерних процессах, чтобы не пытаться закрыть соединение UN-ожидаемо. То же самое происходило со мной, и, наконец, мне пришлось подавлять ошибки, чтобы труба не закрывалась внезапно.

Ответ 4

У меня была такая же проблема в интерактивной записной книжке Jupyter (Python 3.6.8) после прерывания многопроцессорного процесса.

Моим краткосрочным исправлением было повторное создание объектов Manager и Namespace:

from multiprocessing import Manager

mgr = Manager()
ns = mgr.Namespace()

Из руководства:

Avoid terminating processes

Использование метода Process.terminate для остановки процесса может вызвать любые общие ресурсы (такие как блокировки, семафоры, каналы и очереди) в настоящее время используется процессом, чтобы стать сломанным или недоступен для других процессов.

Поэтому, вероятно, лучше всего рассмотреть возможность использования Process.terminate на процессах, которые никогда не используют общие ресурсы.