Gevent monkeypatching прерывает многопроцессорность
Я пытаюсь использовать многопроцессорный пул для запуска группы процессов, каждый из которых будет запускать пул gevent из greenlets. Причина этого в том, что существует много активности в сети, но также много активности процессора, поэтому, чтобы максимизировать пропускную способность и все мои ядра процессора, мне нужно несколько процессов и gentent async monkey patching. Я использую многопроцессорный менеджер для создания очереди, к которой процессы будут обращаться, чтобы получить данные для обработки.
Вот упрощенный фрагмент кода:
import multiprocessing
from gevent import monkey
monkey.patch_all(thread=False)
manager = multiprocessing.Manager()
q = manager.Queue()
Вот его исключение:
Traceback (most recent call last):
File "multimonkeytest.py", line 7, in <module>
q = manager.Queue()
File "/usr/local/Cellar/python/2.7.2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/managers.py", line 667, in temp
token, exp = self._create(typeid, *args, **kwds)
File "/usr/local/Cellar/python/2.7.2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/managers.py", line 565, in _create
conn = self._Client(self._address, authkey=self._authkey)
File "/usr/local/Cellar/python/2.7.2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/connection.py", line 175, in Client
answer_challenge(c, authkey)
File "/usr/local/Cellar/python/2.7.2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/connection.py", line 409, in answer_challenge
message = connection.recv_bytes(256) # reject large message
IOError: [Errno 35] Resource temporarily unavailable
Я считаю, что это должно быть связано с некоторой разницей между поведением нормального модуля сокета и модулем сокета gevent.
Если я обезвреживаю в подпроцессе, очередь создается успешно, но когда подпроцесс пытается получить() из очереди, возникает очень подобное исключение. Сокет должен быть обезврежен из-за большого количества сетевых запросов в подпроцессах.
Моя версия gevent, которая, как мне кажется, самая последняя:
>>> gevent.version_info
(1, 0, 0, 'alpha', 3)
Любые идеи?
Ответы
Ответ 1
использовать monkey.patch_all(thread=False, socket=False)
Я столкнулся с той же проблемой в аналогичной ситуации и проследил ее до строки 115 в gevent/monkey.py
в функции patch_socket()
: _socket.socket = socket.socket
. Комментирование этой строки предотвращает поломку.
Здесь gevent заменяет библиотеку stdlib socket
своей собственной. multiprocessing.connection
использует библиотеку socket
довольно широко и, по-видимому, не допускает этого изменения.
В частности, вы увидите это в любом сценарии, когда импортируемый модуль выполняет вызов gevent.monkey.patch_all()
без установки socket=False
. В моем случае это было grequests
, и я должен был переопределить исправление модуля сокета, чтобы исправить эту ошибку.
Ответ 2
Известно, что применение многопроцессорности в контексте gevent вызывает проблемы. Однако ваше обоснование разумно ( "много активности сети, но также и много активности процессора" ). Если хотите, просмотрите http://gehrcke.de/gipc. Это предназначено в первую очередь для вашего случая использования. С помощью gipc вы можете легко создать несколько полностью управляемых gevent дочерних процессов и позволить им взаимодействовать друг с другом и/или с родителем через каналы.
Если у вас есть конкретные вопросы, вы можете вернуться ко мне.
Ответ 3
Если вы будете использовать исходную очередь, то код будет работать нормально даже с патчей-патчей с обезьяной.
import multiprocessing
from gevent import monkey
monkey.patch_all(thread=False)
q= multiprocessing.Queue()
Ответ 4
Ваш предоставленный код работает для меня в Windows 7.
EDIT:
Удален предыдущий ответ, потому что я пробовал свой код на Ubuntu 11.10 VPS, и я получаю ту же ошибку.
Посмотрите, как У этой проблемы тоже есть эта проблема
Ответ 5
Написал новый плагин Nose Multiprocess - он должен хорошо играть со всеми типами сумасшедших патчей на основе Gevent.
https://pypi.python.org/pypi/nose-gevented-multiprocess/
https://github.com/dvdotsenko/nose_gevent_multiprocess
- Переключает с
multiprocess.fork
в обычный subprocess.popen
для рабочих процессов (исправляет проблемы с ошибочно разделяемыми модульными уровнями для меня)
- Переключено с multiprocess.Queue на JSON-RPC через HTTP для master-to-клиентов RPC
- Теперь это теоретически позволяет распределять тесты на несколько машин.