2015-09-10 8 views
0

Я пытаюсь создать неблокирующее веб-приложение, которое использует Tornado. Это приложение использует PeriodicCallback как планировщик для захвата данных из новостных сайтов:tornado Periodiccallback и операции сокета внутри обратного вызова

for nc_uuid in self.LIVE_NEWSCOLLECTORS.keys(): 
      self.LIVE_NEWSCOLLECTORS[nc_uuid].agreggator,ioloop=args 
      period=int(self.LIVE_NEWSCOLLECTORS[nc_uuid].period)*60 
      if self.timer is not None: period = int(self.timer) 

      #self.scheduler.add_job(func=self.LIVE_NEWSCOLLECTORS[nc_uuid].getNews,args=[self.source,i],trigger='interval',seconds=10,id=nc_uuid) 
      task = tornado.ioloop.PeriodicCallback(lambda:self.LIVE_NEWSCOLLECTORS[nc_uuid].getNews(self.source,i),1000*10,ioloop) 
      task.start() 

«GetData», который звонит в качестве обратного вызова имеет запрос асинхронной HTTP, который анализирует и отправил данные в TCPServer для анализа с помощью вызова метода process_responce :

@gen.coroutine 
def process_response(self,*args,**kwargs): 
buf = {'sentence':str('text here')} 
data_string = json.dumps(buf) 
s.send(data_string) 
while True: 
    try: 
     data = s.recv(100000) 
     if not data: 
       print "connection closed" 
       s.close() 
       break 
     else: 
      print "Received %d bytes: '%s'" % (len(data), data) 
      # s.close() 
      break 
    except socket.error, e: 
      if e.args[0] == errno.EWOULDBLOCK: 
       print 'error',errno.EWOULDBLOCK 
       time.sleep(1)   # short delay, no tight loops 
      else: 
       print e 
       break 
    i+=1 

Внутри process_response Я использую базовый пример для неблокирующих операций сокета. Process_response показывает примерно следующее: error 10035 error 10035 Получено 75 байт: '{"mode": 1, "keyword": "\ u0435 \ u0432 \ u0440 \ u043e", "предложение": "текст здесь"} '

Это выглядит нормально. Но при получении данных основной IOLoop блокируются! Если бы я попросил веб-сервер, он бы не возвращал мои anydata до тех пор, пока не закончится task_allback ... Где моя ошибка?

ответ

0

time.sleep() является блокирующей функцией и никогда не должен использоваться в неблокирующем коде. Вместо этого используйте yield gen.sleep().

Также рассмотрите возможность использования tornado.iostream.IOStream вместо операций с сырым сокетом.