2015-11-04 5 views
1

Я пытаюсь получить некоторый код, работающий там, где я могу реализовать вход в многопоточную программу с помощью gevent. То, что я хотел бы сделать, - настроить пользовательские обработчики ведения журнала, чтобы помещать события журнала в очередь, в то время как процесс прослушивания постоянно следит за тем, чтобы новые события журнала обрабатывались надлежащим образом. Я делал это в прошлом с Multiprocessing, но никогда с Gevent.Python Gevent Shared Queue (процесс прослушивания)

У меня проблема, когда программа попасться в бесконечном цикле (процесс слушателя), а не позволяя другим потокам «работают» ...

В идеале, после того, как рабочие процессы закончили, я могу передать произвольное значение процессу слушателя, чтобы сообщить ему разбить цикл, а затем объединить все процессы вместе. Вот то, что я до сих пор:

import gevent 
from gevent.pool import Pool 
import Queue 
import random 
import time 

def listener(q): 
    while True: 
     if not q.empty(): 
      num = q.get() 
      print "The number is: %s" % num 
      if num <= 100: 
       print q.get() 
      # got passed 101, break out 
      else: 
       break 
     else: 
      continue 
def worker(pid,q): 
    if pid == 0: 
     listener(q) 
    else: 
     gevent.sleep(random.randint(0,2)*0.001) 
     num = random.randint(1,100) 
     q.put(num) 

def main(): 
    q = Queue.Queue() 
    all_threads = [] 
    all_threads = [gevent.spawn(worker, pid,q) for pid in xrange(10)] 
    gevent.wait(all_threads[1:]) 
    q.put(101) 
    gevent.joinall(all_threads) 

if __name__ == '__main__': 
    main() 

Как я уже говорил, программа, кажется, становится зацикливаться на этом первом процессе и не позволяет другим работникам делать свое дело. Я также пробовал порождать процесс слушателя полностью отдельно (на самом деле, как бы я это сделал), но это, похоже, не работает, поэтому я пробовал этот путь.

Любая помощь будет оценена по достоинству, почувствуйте, что я, вероятно, просто пропустил что-то очевидное о заднем конце Gevent.

Благодаря

ответ

1

Первая проблема заключается в том, что слушатель никогда не приносит, если очередь изначально пуст. Первой задачей, которую вы создаете, является ваш слушатель. Когда он начнется, появится while True:, q будет пустым, поэтому вы перейдете к ветке else, которая только что продолжается, зациклившись до начала цикла while, а затем q все еще пуст. Таким образом, вы просто сидите в первом потоке, постоянно проверяя, что q пуст.

Главное, что gevent не использует «родные» потоки или процессы. В отличие от «реальных» потоков, которые в любой момент можно переключать на что-то за кулисами (например, планировщик ОС), gevent использует «зеленые», что требует, чтобы вы что-то делали для «управления» для другой задачи. Это что-то вроде того, что, по мнению gevent, блокирует, например, чтение из сети, диска или использование одной из операций блокировки gevent.

Одним из грубых исправлений будет запуск вашего слушателя, когда pid == 9, а не 0. Запустив его последним, будут элементы в q, и он войдет в основную ветвь. Недостатком является то, что это не устраняет логическую проблему, поэтому в первый раз, когда очередь пуста, вы снова застреваете в своем бесконечном цикле.

Более правильным решением было бы поставить gevent.sleep() вместо continue. sleep - это операция блокировки, поэтому ваши другие задачи получат возможность запуска. Без аргументов он не ждет времени, но все же дает gevent возможность решить переключиться на другую задачу, если она готова к запуску. Это все еще не очень эффективно, хотя, как если бы Очередь была пуста, она будет тратить много бессмысленного времени на проверку этого снова и снова и просить снова запустить, как только это будет возможно. более медленный, чем значение по умолчанию 0, будет более эффективным, но будет задерживать обработку сообщений журнала.

Однако вы можете воспользоваться тем фактом, что многие типы gevent, такие как Queue, могут использоваться более путинскими способами и сделать ваш код намного проще и понятнее, а также более эффективным.

import gevent 
from gevent.queue import Queue 

def listener(q): 
    for msg in q: 
     print "the number is %d" % msg 

def worker(pid,q): 
    gevent.sleep(random.randint(0,2)*0.001) 
    num = random.randint(1,100) 
    q.put(num) 

def main(): 
    q = Queue() 
    listener_task = gevent.spawn(listener, q) 
    worker_tasks = [gevent.spawn(worker, pid, q) for pid in xrange(1, 10)] 
    gevent.wait(worker_tasks) 
    q.put(StopIteration) 
    gevent.join(listener_task) 

Здесь Queue может работать в качестве итератора в for цикле. Пока есть сообщения, он получит элемент, запустит цикл, а затем ждет другого элемента.Если предметов нет, они просто будут блокироваться и перемещаться, пока не появится следующий. Так как он блокируется, gevent переключится на одну из ваших других задач для запуска, избегая проблемы с бесконечным циклом, которые имеет ваш пример кода.

Поскольку эта версия использует Queue в качестве итератора цикла, также есть автоматически хорошее значение дозорного значения, которое мы можем поставить в очередь, чтобы заставить задачу слушателя выйти. Если цикл for получает StopIteration из своего итератора, он будет полностью выйти. Поэтому, когда наш цикл, который читается с q, получает StopIteration из q, он завершается, а затем функция завершается, и задание порождения завершено.