Я использую Django и Celery, и я пытаюсь настроить маршрутизацию на несколько очередей. Когда я указываю задачи и exchange
(либо в декораторе задачи, либо с использованием apply_async()
), задача не добавляется к брокеру (который является Kombu, подключающимся к моей базе данных MySQL).Django & Celery - Проблемы с маршрутизации
Если я укажу имя очереди в декораторе задачи (что будет означать, что ключ маршрутизации игнорируется), задача работает нормально. Это, по-видимому, проблема с настройкой маршрутизации/обмена.
Любая идея, в чем проблема?
Вот установка:
settings.py
INSTALLED_APPS = (
...
'kombu.transport.django',
'djcelery',
)
BROKER_BACKEND = 'django'
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = "tasks"
CELERY_DEFAULT_EXCHANGE_TYPE = "topic"
CELERY_DEFAULT_ROUTING_KEY = "task.default"
CELERY_QUEUES = {
'default': {
'binding_key':'task.#',
},
'i_tasks': {
'binding_key':'important_task.#',
},
}
tasks.py
from celery.task import task
@task(routing_key='important_task.update')
def my_important_task():
try:
...
except Exception as exc:
my_important_task.retry(exc=exc)
Инициировать задача:
from tasks import my_important_task
my_important_task.delay()
Как вы проходите routing_key ? С async_apply? – mher
Я использую метод 'delay()', который является просто ярлыком для 'apply_async()'. Я пытаюсь сохранить спецификацию «routing_key» с помощью метода задачи (через декоратор), а не когда он вызывается. Я попытался передать ключ, используя 'apply_async()' вместо этого, но у меня такая же проблема. –
Задержка не принимает ключевое слово routing_key. Это упрощенный вариант apply_async, но они не совпадают. – mher