2011-12-30 3 views
19

Я пытаюсь использовать многопроцессорный пул для запуска группы процессов, каждый из которых будет запускать пул gevent из зеленых. Причина этого в том, что существует большая активность в сети, но также много активности процессора, поэтому, чтобы максимизировать пропускную способность и все мои ядра процессора, мне нужно несколько процессов и асинхронная обезьяна gevent. Я использую менеджер многопроцессорности для создания очереди, к которой процессы будут обращаться, чтобы получить данные для обработки.Gevent monkeypatching break interprocessing

Вот упрощенный фрагмент кода:

import multiprocessing 

from gevent import monkey 
monkey.patch_all(thread=False) 

manager = multiprocessing.Manager() 
q = manager.Queue() 

Вот исключение производит:

Traceback (most recent call last): 
    File "multimonkeytest.py", line 7, in <module> 
    q = manager.Queue() 
    File "/usr/local/Cellar/python/2.7.2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/managers.py", line 667, in temp 
    token, exp = self._create(typeid, *args, **kwds) 
    File "/usr/local/Cellar/python/2.7.2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/managers.py", line 565, in _create 
    conn = self._Client(self._address, authkey=self._authkey) 
    File "/usr/local/Cellar/python/2.7.2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/connection.py", line 175, in Client 
    answer_challenge(c, authkey) 
    File "/usr/local/Cellar/python/2.7.2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/connection.py", line 409, in answer_challenge 
    message = connection.recv_bytes(256)   # reject large message 
IOError: [Errno 35] Resource temporarily unavailable 

Я считаю, что это должно быть связано с какой-то разницы между поведением нормальной розетки модуль и модуль разъема gevent.

Если я обезвреживаю в подпроцессе, очередь создается успешно, но когда подпроцесс пытается получить() из очереди, возникает очень похожее исключение. Сокет должен быть обезврежен из-за большого количества сетевых запросов в подпроцессах.

Моя версия GEvent, который я считаю, является последним:

>>> gevent.version_info 
(1, 0, 0, 'alpha', 3) 

Любые идеи?

+0

, связанный с этим: http://bugs.python.org/issue6056 – jfs

ответ

1

Ваш код работает при условии, для меня на Windows, 7.

EDIT:

Убрана предыдущий ответ, потому что я попробовал ваш код на Ubuntu 11.10 VPS, и я получаю ту же ошибку ,

Посмотрите как Eventlet have this issue too

+0

Ошибка происходит без фактических запросов, но я также протестировал ее на локальном сервере gentent bottle.py. Я получаю трассировку как для OS X Lion, так и для Ubuntu 11.04 VPS, как с Python 2.7. Какую ОС/Python/Gevent вы используете? – user964375

+0

@ user964375, Hm, на Ubuntu 11.10 (VPS), я получаю ту же ошибку. В Win7 нет ошибок. – reclosedev

+0

Протестировано на окнах xp, и код, даже без части gevent, прерывается через некоторое время. Кажется, проблема в многопроцессорном модуле. – Martin

16

monkey.patch_all(thread=False, socket=False) использование

Я столкнулся с той же проблемой в аналогичной ситуации, и отслеживали это вниз к линии 115 в gevent/monkey.py под функцией patch_socket(): _socket.socket = socket.socket. Комментирование этой строки предотвращает поломку.

Здесь gevent заменяет библиотеку stdlib socket своей собственной. multiprocessing.connection довольно широко использует библиотеку socket и, по-видимому, не допускает этого изменения.

В частности, вы увидите это в любом случае, когда импортируемый модуль выполняет вызов gevent.monkey.patch_all() без установки socket=False. В моем случае это было grequests, который сделал это, и мне пришлось переопределить исправление модуля сокета, чтобы исправить эту ошибку.

+0

Могу ли я использовать библиотеку запросов с gentent без исправления сокета? –

9

К сожалению, применение многопроцессорности в контексте gevent, как известно, вызывает проблемы. Однако ваше обоснование разумно («много активности сети, но также и много активности процессора»). Если хотите, посмотрите на http://gehrcke.de/gipc. Это предназначено в первую очередь для вашего случая использования. С помощью gipc вы можете легко создать несколько полностью управляемых gevent дочерних процессов и позволить им взаимодействовать друг с другом и/или с родителем через каналы.

Если у вас есть конкретные вопросы, вы можете вернуться ко мне.

+4

Я лично считаю, что это бездумное исправление обезьян, это проблематично. Все, что использует gevent, должно содержать очень большой предупреждающий знак о том, что он сломает. Средний программист, который еще не столкнулся с этой особой сложностью несовместимости gevent-multiprocessing, (по праву) предположил бы, что gevent не нарушает функции stdlib. Не совсем очевидно, что gevent не исправляет функции IPC (что выполнимо), когда он исправляет сокет. –

+0

Я попробовал подход gipc. Поскольку он пропускает объект Queue, альтернативная реализация с использованием труб была слишком запутана. Вместо этого см. Модуль 'нос-gevented-multiprocess' – ddotsenko

2

Если вы будете использовать оригинальную очередь, тогда код будет работать нормально даже с патчей с обезьяной.

import multiprocessing 

from gevent import monkey 
monkey.patch_all(thread=False) 

q= multiprocessing.Queue() 
1

Написал замены носа многопроцессного плагина - это один должен хорошо играть со всеми видами ума GEvent на основе заплат.

https://pypi.python.org/pypi/nose-gevented-multiprocess/

https://github.com/dvdotsenko/nose_gevent_multiprocess

  • Переключение из multiprocess.fork в обычный subprocess.popen для рабочих процессов (исправления уровня модуля ошибочно разделяет проблемы объекты для меня)
  • Switched из multiprocess.Queue в JSON-RPC через HTTP для master-to-клиентов RPC
  • Теперь это теоретически позволяет проводить тесты для множества машин