2016-03-04 1 views
1

Я изменил пример очереди с производителем & потребителем этого Tornado documentation, но не похоже, что параметр timeout, переданный get(), работает вообще, так как потребитель не ждет 10 секунд до выброса исключения. В идеале производитель и потребитель будут работать одновременно. Кроме того, я не знаю, следует ли передать в параметре таймаута в секундах или миллисекундах:tornado асинхронная очередь не ждет

from tornado import gen 
from tornado.ioloop import IOLoop 
from tornado.queues import Queue 

q = Queue() 

@gen.coroutine 
def consumer(): 
    try: 
     while True: 
      item = yield q.get(timeout=10000) 
      try: 
       print('Doing work on %s' % item)  
      finally: 
       q.task_done() 
    except gen.TimeoutError: 
     print('timeout') 
     return 

@gen.coroutine 
def producer(): 
    for item in range(5): 
     yield q.put(item) 
     print('Put %s' % item) 
     yield gen.sleep(2) 

@gen.coroutine 
def main(): 
    # Start consumer without waiting (since it never finishes). 
    IOLoop.current().spawn_callback(consumer) 
    yield producer()  # Wait for producer to put all tasks. 
    yield q.join()  # Wait for consumer to finish all tasks. 
    print('Done') 

IOLoop.current().run_sync(main) 

и вот его исполнение:

Put 0 
Doing work on 0 
timeout 
Put 1 
Put 2 
Put 3 
Put 4 

ответ

4

Таймаут

Как вы можете читать в Tornado' Queue.get docs:

Возвращает будущее, nce доступен элемент или вызывает tornado.gen.TimeoutError после таймаута.

Но это может быть весьма вводящим в заблуждение, поскольку timeout фактически deadline. Таким образом, он должен быть указан либо с datetime.timedelta object:

import datetime 
yield q.get(timeout=datetime.timedelta(seconds=1)) 

или абсолютное время:

timeout = 1.5 # in seconds, floats acceptable 
deadline = IOLoop.current().time() + timeout 
# in most cases IOLoop time is just time.time() 
# I've used separate variables only for the notion 

yield q.get(timeout=deadline) 

В Toro, что был слит в Торнадо, этот аргумент был назван deadline.

В вашем коде указан таймаут 10000, что означает крайний срок до Thu, 01 Jan 1970 02:46:40 GMT.

Потребительской петля

Поскольку у вас есть try/except блока на всей функции, в том числе while цикла, когда происходит TimeoutError ваш потребитель перестает работать. Перемещение обработки исключений в цикл while.

Рабочий пример:

from tornado import gen 
from tornado.ioloop import IOLoop 
from tornado.queues import Queue 

q = Queue() 

@gen.coroutine 
def consumer(): 
    i = 0 
    while True: 
     i += 1 
     print('get cycle %s' % i) 
     try: 
      item = yield q.get(IOLoop.instance().time() + 3) 
      try: 
       print('Doing work on %s' % item) 
      finally: 
       q.task_done() 
     except gen.TimeoutError: 
      print('timeout') 

@gen.coroutine 
def producer(): 
    for item in range(5): 
     yield q.put(item) 
     print('Put %s' % item) 
     yield gen.sleep(2) 

@gen.coroutine 
def main(): 
    # Start consumer without waiting (since it never finishes). 
    IOLoop.current().spawn_callback(consumer) 
    yield producer()  # Wait for producer to put all tasks. 
    yield q.join()  # Wait for consumer to finish all tasks. 
    print('Done')