2017-01-01 3 views
1

Я пытаюсь создать веб-сервер для сбора «команд» через AJAX, а затем распространять команды клиентам через длительный опрос.Python Tornado - Как реализовать сервер с длинными опросами для чтения из очереди

Цель состоит в том, что кто-то POST передает некоторые данные в/add-command.

Другой клиент реализует запрос/опрос клиента с большим опросом, ожидающий выполнения команды.

Я думаю, что очередь - это правильная структура данных, используемая для удержания команд, ожидающих внимания. Я хотел бы, чтобы команды по существу были немедленно распространены на любого клиента с длительным опросом, но удерживались, если в настоящий момент ни один клиент не проводит опрос.

Вот мой скрипт python.

import os 
import time 
import tornado.httpserver 
import tornado.ioloop 
import tornado.web 
import tornado.gen 
import Queue 
import multiprocessing.pool 
import mysql.connector 
import urlparse 
import uuid 
import json 

_commandQueue = Queue.Queue() 
_commandPollInterval = 0.2 
_commandPollTimeout = 10 

class HomeHandler(tornado.web.RequestHandler): 
    def get(self): 
     self.render("home.htm") 

class AddCommandHandler(tornado.web.RequestHandler): 
    def post(self): 
     d = urlparse.parse_qs(self.request.body) 
     _commandQueue.put(d) 
     self.write(str(True)) 

class PollHandler(tornado.web.RequestHandler): 
    @tornado.gen.coroutine 
    def get(self): 
     self.write("start") 
     d = 1 
     d = yield self.getCommand() 
     self.write(str(d)) 
     self.write("end") 
     self.finish() 
    @tornado.gen.coroutine 
    def getCommand(self): 
     start = time.time() 
     while (time.time() - start) < _commandPollTimeout * 1000: 
      if not _commandQueue.empty: 
       return _commandQueue.get() 
      else: 
       time.sleep(_commandPollInterval) 
     return None 

def main(): 
    application = tornado.web.Application(
     [ 
      (r"/", HomeHandler), 
      (r"/add-command", AddCommandHandler), 
      (r"/poll", PollHandler), 
     ], 
     debug=True, 
     template_path=os.path.join(os.path.dirname(__file__), "templates"), 
     static_path=os.path.join(os.path.dirname(__file__), "static"), 
    ) 
    tornado.httpserver.HTTPServer(application).listen(int(os.environ.get("PORT", 5000))) 
    tornado.ioloop.IOLoop.instance().start() 

if __name__ == "__main__": 
    main() 

AddCommandHandler прекрасно работает, чтобы положить вещи в _commandQueue.

PollHandler запрос только раз. Если я назову PollHandler, он, похоже, заблокирует _commandQueue, и я не могу его отложить.

Я подозреваю, что мне нужно присоединиться к очереди, но я не могу найти подходящее время для этого в коде.

ОБНОВЛЕНИЕ - Вот мой окончательный код, благодаря ответам

import os 
import time 
import datetime 
import tornado.httpserver 
import tornado.ioloop 
import tornado.web 
import tornado.gen 
import tornado.queues 
import urlparse 
import json 

_commandQueue = tornado.queues.Queue() 
_commandPollInterval = 0.2 
_commandPollTimeout = 10 

class HomeHandler(tornado.web.RequestHandler): 
    def get(self): 
     self.render("home.htm") 

class AddCommandHandler(tornado.web.RequestHandler): 
    def get(self): 
     cmd = urlparse.parse_qs(self.request.body) 
     _commandQueue.put(cmd) 
     self.write(str(cmd)) 
    def post(self): 
     cmd = urlparse.parse_qs(self.request.body) 
     _commandQueue.put(cmd) 
     self.write(str(cmd)) 

class PollHandler(tornado.web.RequestHandler): 
    @tornado.gen.coroutine 
    def get(self): 
     cmd = yield self.getCommand() 
     self.write(str(cmd)) 
    @tornado.gen.coroutine 
    def getCommand(self): 
     try: 
      cmd = yield _commandQueue.get(
       timeout=datetime.timedelta(seconds=_commandPollTimeout) 
      ) 
      raise tornado.gen.Return(cmd) 
     except tornado.gen.TimeoutError: 
      raise tornado.gen.Return() 

def main(): 
    application = tornado.web.Application(
     [ 
      (r"/", HomeHandler), 
      (r"/add-command", AddCommandHandler), 
      (r"/poll", PollHandler), 
     ], 
     debug=True, 
     template_path=os.path.join(os.path.dirname(__file__), "templates"), 
     static_path=os.path.join(os.path.dirname(__file__), "static"), 
    ) 
    tornado.httpserver.HTTPServer(application).listen(int(os.environ.get("PORT", 5000))) 
    tornado.ioloop.IOLoop.instance().start() 

if __name__ == "__main__": 
    main() 

ответ

1

В асинхронной модели вы должны опустить блокирование операции, time.sleep зло в вашем коде. Более того, я думаю, что лучший способ заключается в использовании (в асинхронном интерфейсе) очереди смерча - tornado.queue.Queue и использовать асинхронный получить:

import datetime 
import tornado.gen 
import tornado.queues 

_commandQueue = tornado.queues.Queue() 


    # ...rest of the code ... 

    @tornado.gen.coroutine 
    def getCommand(self): 
     try: 
      # wait for queue item if cannot obtain in timeout raise exception 
      cmd = yield _commandQueue.get(
       timeout=datetime.timedelta(seconds=_commandPollTimeout) 
      ) 
      return cmd 
     except tornado.gen.Timeout: 
      return None 

Примечания: Модуль tornado.queues сов доступны с Tornado 4.x, если вы используете один старше, Toro поможет.

+0

Это указывало на меня в правильном направлении. Вот несколько незначительных поправок. это должно быть tornado.queues.Queue. Вы также не можете вернуться из сопрограммы, поэтому вместо этого мне пришлось поднять tornado.gen.Return (cmd). Большое спасибо, я так застрял. – OneCleverMonkey

+0

Я обновил ваш комментарий. 'return' действителен в coroutine (функции генератора) с python 3.2. – kwarunek

1

Вы можете НЕ использование сна в слушателем, так как он блокирует чтение из входного потока. time.sleep(_commandPollInterval). То, что вы должны использовать является yield gen.sleep(_commandPollInterval)

 Смежные вопросы

  • Нет связанных вопросов^_^