2015-04-30 2 views
7

Я бегу Django, Celery и RabbitMQ. То, что я пытаюсь достичь, состоит в том, чтобы гарантировать, что задачи, связанные с одним пользователем, выполняются по порядку (в частности, по одному в то время я не хочу совмещать задачи на пользователя)Как обеспечить выполнение выполнения задачи для пользователя с помощью Celery, RabbitMQ и Django?

  • всякий раз, когда добавляется новая задача для пользователя, это должно зависеть от недавно добавленной задачи. Дополнительная функциональность может включать не добавление задачи в очередь, если задача этого типа поставлена ​​в очередь для этого пользователя и еще не запущена.

Я сделал некоторые исследования и:

  • Я не мог найти способ связать вновь созданную задачу с уже находится в очереди один в самом сельдерее, цепи, кажется, только в состоянии связать новые задачи ,
  • Я думаю, что обе функции можно реализовать с помощью специального обработчика сообщений RabbitMQ, хотя в конце концов это может быть сложно.
  • Я также читал о celery-tasktree, и это может быть самым простым способом для обеспечения порядка выполнения, но как связать новую задачу с уже «applied_async» task_tree или queue? Есть ли способ, которым я мог бы реализовать эту дополнительную не дублируемую функциональность с помощью этого пакета?

Edit: Существует этот и этот «замок» пример в celery cookbook и как концепция хорошо, я не могу видеть возможный способ заставить его работать как положено в моем случае - просто, если я не могу получить блокировку для пользователя, задача должна быть повторена, но это означает, что нужно довести ее до конца очереди.

Какой был бы лучший способ действий здесь?

+0

Я предполагаю, что вы не знаете, какие задачи для конкретного пользователя перед установкой задачи? –

+0

Почему бы не создать очередь самостоятельно (на пользователя) и взять сельдерей оттуда? – trinchet

ответ

0

Если вы настроите работников сельдерея так, чтобы они могли выполнять только одну задачу за раз (см. Параметр worker_concurrency), тогда вы можете обеспечить параллелизм, который вам нужен, для каждого пользователя. Используя метод, как

NUMBER_OF_CELERY_WORKERS = 10 

def get_task_queue_for_user(user): 
    return "user_queue_{}".format(user.id % NUMBER_OF_CELERY_WORKERS) 

получить очереди задач на основе идентификатора пользователя, каждая задача будет возложена на одной и той же очереди для каждого пользователя. Работникам необходимо настроить только потребление задач из одной очереди задач.

Он будет играть так:

  1. Пользователь 49 запускает ет задачу

  2. Задача посылается user_queue_9

  3. Когда один и только сельдерей рабочий, который слушает user_queue_9 готов к использованию новой задачи, задание выполнено

Это Hacky ответ, хотя, потому что

  • требует только одного рабочего сельдерея для каждой очереди является хрупкой системой - если сельдерей рабочего останавливается, вся очередь останавливается

  • рабочие работает неэффективно

+0

Основываясь на этом, почему бы вам просто не использовать десять ведер, и сопоставить их с «user.id% 10» или в случае строки «hash (user.id)% 10»? – knipknap

+0

Спасибо, вы поняли, что я не решил параллельную часть вопроса - мой ответ в настоящее время еще не верен. –