Сломанная труба при использовании менеджеров многопроцессорности Python (BaseManager/SyncManager) для обмена очередью с удаленными машинами
В прошлом месяце у нас была постоянная проблема с пакетом многопроцессорности Python 2.6.x, когда мы пытались использовать его для совместного использования очереди между несколькими различными (linux) компьютерами. Я задал этот вопрос непосредственно Джесси Ноллеру, так как мы еще не нашли ничего, что разъясняет проблему в документах StackOverflow, Python, исходном коде или в другом месте в Интернете.
Наша команда инженеров не смогла решить эту проблему, и мы поставили вопрос довольно многим людям в группах пользователей python безрезультатно. Я надеялся, что кто-то может пролить некоторое понимание, так как я чувствую, что мы делаем что-то неправильное, но слишком близко к проблеме, чтобы увидеть, что это такое.
Вот симптом:
Traceback (most recent call last):
File "/var/django_root/dev/com/brightscope/data/processes/daemons/deferredupdates/servers/queue_server.py", line 65, in get_from_queue
return queue, queue.get(block=False)
File "<string>", line 2, in get
File "/usr/local/lib/python2.6/multiprocessing/managers.py", line 725, in _callmethod
conn.send((self._id, methodname, args, kwds))
IOError: [Errno 32] Broken pipe
(Я показываю, где наш код вызывает queue.get() для объекта общей очереди, размещенного менеджером, который расширяет SyncManger).
Что связано с проблемой, так это то, что если мы подключаемся к этой общей очереди на одном компьютере (позвольте этому machine A
), даже из-за множества параллельных процессов, мы никогда не сталкиваемся с проблемой. Это только когда мы подключаемся к очереди (опять же, используя класс, который расширяет многопроцессорность SyncManager и в настоящее время не добавляет никаких дополнительных функций) с других компьютеров (позвольте называть эти machines B and C
) и запускайте большой объем элементов в очередь и из очереди в в то же время, когда мы сталкиваемся с проблемой.
Как будто пакет многопроцессорности python обрабатывает локальные соединения (хотя они все еще используют один и тот же метод соединения manager.connect()) таким образом, который работает от machine A
, но когда удаленные соединения выполняются одновременно по меньшей мере с одного machines B or C
мы получаем ошибку Broken pipe.
Во всех чтениях моей команды мы решили, что проблема связана с блокировкой. Мы подумали, что мы не должны использовать Queue.Queue
, но вместо этого multiprocessing.Queue
, но мы переключились и проблема осталась (мы также заметили, что собственная общая очередь SyncManager - это экземпляр Queue.Queue).
Мы вытаскиваем наши волосы о том, как даже отлаживать проблему, так как трудно воспроизвести, но происходит довольно часто (много раз в день, если мы вставляем и .get() загружаем множество элементов из очереди).
Созданный нами метод get_from_queue
пытается повторить получение элемента из очереди ~ 10 раз со случайными интервалами сна, но похоже, что если он терпит неудачу один раз, он провалится все десять раз (что заставило меня поверить в это. register() и .connect() в менеджере, возможно, не дают другого подключения сокета к серверу, но я не смог подтвердить это, читая документы или глядя на внутренний исходный код Python).
Может ли кто-нибудь дать представление о том, где мы можем смотреть, или как мы можем отслеживать, что на самом деле происходит?
Как мы можем начать новое соединение в случае сломанного трубопровода с помощью multiprocessing.BaseManager
или multiprocessing.SyncManager
?
Как мы можем предотвратить поврежденную трубу в первую очередь?
Ответы
Ответ 1
FYI. Если кто-то еще работает по этой же ошибке, после обширного консультирования с Ask Solem и Jesse Noller из команды разработчиков ядра Python, похоже, что это на самом деле ошибка в текущем python 2.6.x(и, возможно, 2.7+ и, возможно, 3.x). Они рассматривают возможные решения, и исправление, вероятно, будет включено в будущую версию Python.
Ответ 2
Я страдал от той же проблемы, даже если подключение к локальному хосту в Python 2.7.1. После дня отладки я нашел причину и обходной путь:
Причина: класс BaseProxy имеет локальное хранилище потока, которое кэширует соединение, которое повторно используется для будущих соединений, вызывая ошибки "сломанной трубы" даже при создании нового менеджера.
Обходной путь: удалите кэшированное соединение перед повторным подключением. Добавьте код в предложение try-exc в строке, которая вызывает исключение, а затем повторите его.
from multiprocessing.managers import BaseProxy
...
if address in BaseProxy._address_to_local:
del BaseProxy._address_to_local[address][0].connection
address
- это имя хоста /ip, используемое для подключения к диспетчеру многопроцессорной обработки. Если вы не указали это явно, обычно это должно быть "localhost"
Ответ 3
Также вы можете попытаться поймать исключение в дочерних процессах, чтобы не пытаться закрыть соединение UN-ожидаемо. То же самое происходило со мной, и, наконец, мне пришлось подавлять ошибки, чтобы труба не закрывалась внезапно.
Ответ 4
У меня была такая же проблема в интерактивной записной книжке Jupyter (Python 3.6.8) после прерывания многопроцессорного процесса.
Моим краткосрочным исправлением было повторное создание объектов Manager
и Namespace
:
from multiprocessing import Manager
mgr = Manager()
ns = mgr.Namespace()
Из руководства:
Avoid terminating processes
Использование метода Process.terminate для остановки процесса может вызвать любые общие ресурсы (такие как блокировки, семафоры, каналы и очереди) в настоящее время используется процессом, чтобы стать сломанным или недоступен для других процессов.
Поэтому, вероятно, лучше всего рассмотреть возможность использования Process.terminate на процессах, которые никогда не используют общие ресурсы.