2015-03-10 2 views
0

Учитывая расположение:Celery KeyError при упаковке функции app.task с импортированным декоратором; Ошибки только ж/импорт

background \ 
    tasks \ 
     __init__.py 
     generic.py 
     helpers.py 
    __init__.py 
    _server.py 
    config.py 
    router.py 
    server.py 

и запуск _server.py с celery -A background._server worker

Я по заданному KeyError: u'generic.adder' в Работником при попытке вызова функции generic.adder с .delay(..)

Функция сумматора:

generic.py

from background.server import app 
from background.tasks.helpers import standardized_task 

@standardized_task(app, name='generic.adder') 
def adder(x, y): 
    return x + y 

... завернутый в функцию, которая принимает экземпляр app и стандартизирует ввод/вывод задачи Сельдерея на объект JSON, который возвращает результаты и функцию. (включен ниже) Однако проблема заключается в , когда эта функция обертки находится в том же файле, что и generic.adder, она работает безупречно - когда она импортирована и используется, как указано выше, она вызывает ключевую ошибку.

Я заставил поверить, что обертка какая-то образом изменив атрибут name=.. переданной app.task с именем функции из helpers.py, которая вызывает буквальное имя generic.adder не будет найдены при обращении с этой задачей.

Важно также отметить, что если вы попробуете позвонить adder(..) изнутри _server.py (модуль работает от CLI сельдерея), он работает безупречно; это только при вызове через распределенный интерфейс, который вызывает ошибку; смысл, импортные работы независимо от Сельдерея.

Файл helpers.py

__author__ = 'Blake' 

import types 

JSON_TYPES = [ 
    dict, list, unicode, str, int, long, float, bool, types.NoneType 
] 

def standardized_task(app, *args, **kwargs): 
    def wrapped_task(fn): 
     def wrapped_fn(*fnargs, **fnkwargs): 
      throws = fnkwargs.get('throws', Exception) 
      raises = fnkwargs.get('raises', False) 

      if not hasattr(throws, '__call__') and not isinstance(throws(), Exception): 
       raise ValueError('throws value not of type Exception: %s' % type(throws)) 

      result, error = None, None 

      try: 
       result = fn(*fnargs, **fnkwargs) 

       if type(result) not in JSON_TYPES: 
        result = unicode(result) 

      except throws, e: 
       error = e 

       if raises: 
        raise 
      finally: 
       return { 
        'result': result, 
        'error': str(error) if error else None, 
        'meta': { 
         'args': fnargs, 'kwargs': fnkwargs 
        } 
       } 

     return app.task(wrapped_fn, *args, **kwargs) 
    return wrapped_task 

Файл _server.py

from background.server import app 
from background.tasks.generic import * 

ответ

0

Ответ не использовать декоратора, но продлить celery.Task в абстрактный класс и использования, @app.task(name='...', base=MyNewAbstractTask)

следующий пост SO объясняет это лучше:

celery task and customize decorator

import types 

JSON_TYPES = [ 
    dict, list, unicode, str, int, long, float, bool, types.NoneType 
] 

class StandardizedTask(Task): 
    abstract = True 

    def __call__(self, *args, **kwargs): 
     return self.inner_run(*args, **kwargs) 

    def inner_run(self, *args, **kwargs): 
     throws = kwargs.get('throws', Exception) 
     raises = kwargs.get('raises', False) 

     if not hasattr(throws, '__call__') and not isinstance(throws(), Exception): 
      raise ValueError('throws value not of type Exception: %s' % type(throws)) 

     result, error = None, None 

     try: 
      result = self.run(*args, **kwargs) 

      if type(result) not in JSON_TYPES: 
       result = unicode(result) 

     except throws, e: 
      error = e 

      if raises: 
       raise 
     finally: 
      return { 
       'result': result, 
       'error': str(error) if error else None, 
       'meta': { 
        'args': args, 'kwargs': kwargs }}