2015-03-28 8 views
0

Я пытаюсь разработать небольшое приложение, которое будет собирать данные о погоде из API. Я использовал APScheduler для выполнения функции каждые x минут. Я использую Python Tornado framework.APScheduler запускает функцию async в Tornado Python

Ошибки я получаю:

INFO  Job "GetWeather (trigger: interval[0:01:00], next run at: 2015-03-28 11:40:58 CET)" executed successfully 
ERROR Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x0335C978>, <tornado.concurrent.Future object at 0x03374430>) 
Traceback (most recent call last): 
    File "C:\Python34\Lib\site-packages\tornado\ioloop.py", line 568, in _run_callback 
    ret = callback() 
    File "C:\Python34\Lib\site-packages\tornado\stack_context.py", line 275, in null_wrapper 
    return fn(*args, **kwargs) 
greenlet.error: cannot switch to a different thread 

который я думаю, что исходит из сопрограммных от GetWeather(), как, если удалить все asycn функции от него, он работает.

Я использую Motor для считывания необходимых координат и передачи их через API и хранения данных о погоде в MongoDB.

import os.path, logging 
import tornado.web 
import tornado.ioloop 
from tornado.httpclient import AsyncHTTPClient 
from tornado import gen 
from tornado.options import define, options 
from apscheduler.schedulers.tornado import TornadoScheduler 
import motor 

client = motor.MotorClient() 
db = client['apitest'] 

console_log = logging.getLogger(__name__) 

define("port", default=8888, help="run on the given port", type=int) 
define("debug", default=False, help="run in debug mode") 

class MainRequest (tornado.web.RequestHandler): 
    def get(self): 
     self.write("Hello") 

scheduler = TornadoScheduler() 

class ScheduledTasks(object): 
    def get(self): 
     print("This is the scheduler"); 

def AddJobs(): 
    scheduler.add_job(GetWeather, 'interval', minutes=1) 

def StartScheduler(): 
    scheduler.start(); 

def StopScheduler(): 
    scheduler.stop(); 

class Weather(tornado.web.RequestHandler): 
    def get(self): 
     self.write("This is the Weather Robot!") 
     GetWeather() 

@gen.coroutine 
def GetWeather(): 
    ''' 
    Getting city weather from forecast.io API 
    ''' 
    console_log.debug('Start: weather robot')  
    cursor = FindCities() 

    while (yield cursor.fetch_next): 
     city = cursor.next_object() 
     lat = str(city["lat"]) 
     lon = str(city["lon"])  
     http_client = AsyncHTTPClient() 
     response = yield http_client.fetch("https://api.forecast.io/forecast/3925d0668cf520768ca855951f1097cd/%s,%s" %(lat, lon)) 

     if response.error: 
      print ("Error:", response.error) 
      # Store all cities with errors in order to save them in the log file 
     else:   
      json = tornado.escape.json_decode(response.body) 
      temperature = json["currently"]["temperature"] 
      summary = json["currently"]["summary"] 
      db.cities.update({'_id': city["_id"]}, {'$set': {'temperature': temperature, 'summary': summary}}) 

    console_log.debug('End: weather robot') 
    return 

def FindCities(): 
    ''' 
    cities = [{ 
       "_id" : ObjectId("55165d07258058ee8dca2172"), 
       "name" : "London", 
       "country" : "United Kingdom", 
       "lat" : 51.507351, 
       "lon" : -0.127758 
      }, 
      { 
       "_id" : ObjectId("55165d07258058ee8dca2173"), 
       "name" : "Barcelona", 
       "country" : "Spain", 
       "lat" : 41.385064, 
       "lon" : 2.173403 
      }  
    ''' 
    cities = db.cities.find().sort([('_id', -1)]) 
    return cities 

def main(): 
    logging.basicConfig(level=logging.DEBUG,format='%(levelname)-8s %(message)s') 
    app = tornado.web.Application(
      [ 
       (r'/robots/weather', Weather), 
       (r'/', MainRequest) 
      ], 
      cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__", 
      login_url="/auth/login", 
      template_path=os.path.join(os.path.dirname(__file__), "templates"), 
      static_path=os.path.join(os.path.dirname(__file__), "static"), 
      xsrf_cookies=True, 
      debug=options.debug, 
     ) 
    app.listen(options.port) 
    AddJobs() 
    StartScheduler() 
    tornado.ioloop.IOLoop.instance().start() 


if __name__ == "__main__": 
    main() 

Любая идея, что я делаю неправильно? Как я вижу в коде APScheduler, TornadoScheduler() работает в Tornado IOLoop ... (https://bitbucket.org/agronholm/apscheduler/src/a34075b0037dba46735bae67f598ec6133003ef1/apscheduler/schedulers/tornado.py?at=master)

Oh! Я забыл сказать, что идея состоит в том, чтобы выполнить задачу с помощью APScheduler или вручную.

Большое спасибо!

+0

Исследуя больше я верю, проблема заключается с simulatious использования двигателя и APScheduler, но я не знаю, почему ... – user3159821

ответ

1

По умолчанию TornadoScheduler запускает запланированные задачи в пуле потоков. Однако ваша конкретная задача использует IOLoop и поэтому ожидает, что она будет запущена в том же потоке. Чтобы исправить это, вы можете использовать метод add_callback() торнадо IOLoop, чтобы запланировать задачу, которая должна быть запущена в потоке IOLoop, как можно скорее.

Как так:

def your_scheduled_task(): 
    IOLoop.instance().add_callback(your_real_task_function) 

или даже лучше:

scheduler.add_job(IOLoop.instance().add_callback, 'interval', minutes=1, args=[GetWeather]) 
+0

Это похоже на правильный ответ. –

+0

Спасибо, Алекс! Я не уверен, как реализовать предлагаемое вами решение. Я не хочу казаться оппортунистическим, но вы могли бы привести пример? Я старался, но ничего не работал. Заранее спасибо! – user3159821

+0

Привет, Алекс! Это очень помогло. Большое спасибо, он работает сейчас. – user3159821

0

смотрит на меня как TornadoScheduler, даже если она интегрируется с IOLoop, он still runs operations on a thread pool:

def _create_default_executor(self): 
    """Creates a default executor store, specific to the particular scheduler type.""" 
    return ThreadPoolExecutor() 

Двигатель не нравится работает на нескольких потоков в одном процессе - я только случаи использования тест, где двигатель используется в основном потоке.

Я думаю, вам следует использовать Tornado PeriodicCallback вместо APScheduler, или вы должны использовать PyMongo с APScheduler (поэтому PyMongo работает на потоках фона) вместо Motor.

+0

Если не используются расширенные функции планировщика, PeriodicCallback действительно будет проще. –

+0

Спасибо вам всем! Мне нужно только запланировать выполнение некоторых задач в полночь, поэтому я хотел включить APScheduler. Я хочу, чтобы задачи выполнялись в определенное время. – user3159821