2012-05-22 5 views
6

Я использую Django и Celery, и я пытаюсь настроить маршрутизацию на несколько очередей. Когда я указываю задачи и exchange (либо в декораторе задачи, либо с использованием apply_async()), задача не добавляется к брокеру (который является Kombu, подключающимся к моей базе данных MySQL).Django & Celery - Проблемы с маршрутизации

Если я укажу имя очереди в декораторе задачи (что будет означать, что ключ маршрутизации игнорируется), задача работает нормально. Это, по-видимому, проблема с настройкой маршрутизации/обмена.

Любая идея, в чем проблема?

Вот установка:

settings.py

INSTALLED_APPS = (
    ... 
    'kombu.transport.django', 
    'djcelery', 
) 
BROKER_BACKEND = 'django' 
CELERY_DEFAULT_QUEUE = 'default' 
CELERY_DEFAULT_EXCHANGE = "tasks" 
CELERY_DEFAULT_EXCHANGE_TYPE = "topic" 
CELERY_DEFAULT_ROUTING_KEY = "task.default" 
CELERY_QUEUES = { 
    'default': { 
     'binding_key':'task.#', 
    }, 
    'i_tasks': { 
     'binding_key':'important_task.#', 
    }, 
} 

tasks.py

from celery.task import task 

@task(routing_key='important_task.update') 
def my_important_task(): 
    try: 
     ... 
    except Exception as exc: 
     my_important_task.retry(exc=exc) 

Инициировать задача:

from tasks import my_important_task 
my_important_task.delay() 
+0

Как вы проходите routing_key ? С async_apply? – mher

+0

Я использую метод 'delay()', который является просто ярлыком для 'apply_async()'. Я пытаюсь сохранить спецификацию «routing_key» с помощью метода задачи (через декоратор), а не когда он вызывается. Я попытался передать ключ, используя 'apply_async()' вместо этого, но у меня такая же проблема. –

+0

Задержка не принимает ключевое слово routing_key. Это упрощенный вариант apply_async, но они не совпадают. – mher

ответ

43

Вы используете Django ORM в качестве брокера, который означает, что декларации хранятся только в памяти (см, бесспорно, трудно найти, транспорт сравнительную таблицу на http://readthedocs.org/docs/kombu/en/latest/introduction.html#transport-comparison)

Итак, когда вы применяете эту задачу с routing_key important_task.update не сможет отправить , поскольку он еще не объявил очередь.

Это будет работать, если вы сделаете это:

@task(queue="i_tasks", routing_key="important_tasks.update") 
def important_task(): 
    print("IMPORTANT") 

Но было бы гораздо проще для вас, чтобы использовать функцию автоматической маршрутизации, , так как нет ничего, что показывает, что вам нужно использовать ' обмен раздела», использовать автоматическую маршрутизацию просто удалить настройки:

  • CELERY_DEFAULT_QUEUE,
  • CELERY_DEFAULT_EXCHANGE,
  • CELERY_DEFAULT_EXCHANGE_TYPE
  • CELERY_DEFAULT_ROUTING_KEY
  • CELERY_QUEUES

И объявите ваша задача, как это:

@task(queue="important") 
def important_task(): 
    return "IMPORTANT" 

, а затем начать рабочий потребляющего из этой очереди:

$ python manage.py celeryd -l info -Q important 

или потреблять как из (celery) очереди по умолчанию и important очереди:

$ python manage.py celeryd -l info -Q celery,important 

Еще одна хорошей практика, чтобы не жёстко имен очередей в задачи и использовать CELERY_ROUTES вместо :

@task 
def important_task(): 
    return "DEFAULT" 

затем в ваших настройках:

CELERY_ROUTES = {"myapp.tasks.important_task": {"queue": "important"}} 

Если вы по-прежнему настаиваете на использование тему обмена, то вы можете добавить этот маршрутизатор автоматически объявить все очереди в первый раз задачи отправленный:

class PredeclareRouter(object): 
    setup = False 

    def route_for_task(self, *args, **kwargs): 
     if self.setup: 
      return 
     self.setup = True 
     from celery import current_app, VERSION as celery_version 
     # will not connect anywhere when using the Django transport 
     # because declarations happen in memory. 
     with current_app.broker_connection() as conn: 
      queues = current_app.amqp.queues 
      channel = conn.default_channel 
      if celery_version >= (2, 6): 
       for queue in queues.itervalues(): 
        queue(channel).declare() 
      else: 
       from kombu.common import entry_to_queue 
       for name, opts in queues.iteritems(): 
        entry_to_queue(name, **opts)(channel).declare() 
CELERY_ROUTES = (PredeclareRouter(),) 
+0

Спасибо за объяснение! –

+2

Является ли эта проблема с объявлениями очередей и обменами, разрешенными в Celery 3? Я использую новые 'CELERY_QUEUES = (Queue (...), ...)' в настройках, означает ли это, что очереди объявляются правильно? –

+0

Примечание: В Celery 4.0 и выше CELERY_ROUTES был заменен на CELERY_TASK_ROUTES. Мог бы спасти чье-то время. –