2016-10-11 9 views
0

У меня возникла проблема в том, что мне нужно записать значения, сгенерированные потребителем на диск. Я не хочу открывать новый экземпляр файла для записи каждый раз, поэтому я решил использовать вторую очередь и другого потребителя для записи на диск из одного Greenlet. Проблема с моим кодом заключается в том, что вторая очередь не потребляет асинхронный вызов из первой очереди. Первая очередь заканчивается первой, а затем вторая очередь потребляется. Я хочу записывать значения на диск одновременно, тогда генерируются другие значения. Спасибо за помощь!Gevent: Использование двух очередей с двумя пользователями без одновременной блокировки друг друга

#!/usr/bin/python 
#- * -coding: utf-8 - * - 
import gevent #pip install gevent 
from gevent.queue import * 
import gevent.monkey 
from timeit import default_timer as timer 
from time import sleep 
import cPickle as pickle 

gevent.monkey.patch_all() 

def save_lineCount(count): 
    with open("count.p", "wb") as f: 
     pickle.dump(count, f) 

def loader(): 
    for i in range(0,3): 
     q.put(i) 

def writer(): 
    while True: 
     task = q_w.get() 
     print "writing",task 
     save_lineCount(task) 

def worker(): 
    while not q.empty(): 
     task = q.get() 
     if task%2: 
      q_w.put(task) 
      print "put",task 
      sleep(10) 

def asynchronous(): 
    threads = [] 
    threads.append(gevent.spawn(writer)) 
    for i in range(0, 1): 
     threads.append(gevent.spawn(worker)) 
    start = timer() 
    gevent.joinall(threads,raise_error=True) 
    end = timer() 
    #pbar.close() 
    print "\n\nTime passed: " + str(end - start)[:6] 


q = gevent.queue.Queue() 
q_w = gevent.queue.Queue() 
gevent.spawn(loader).join() 
asynchronous() 

ответ

0

В целом, этот подход должен работать нормально. Есть некоторые проблемы с этим конкретным кодом, хотя:

  • Вызов time.sleep заставит все greenlets блокировать. Вам нужно либо позвонить gevent.sleep, либо обезвредить процесс, чтобы иметь только один блок greenlet (я вижу gevent.monkey импортированный, но patch_all не вызывается). Я подозреваю, что это главная проблема.

  • Запись в файл также является синхронной и заставляет все зеленые блокировать. Вы можете использовать FileObjectThread, если это серьезное узкое место.