2016-03-10 3 views
1

Приложение django позволяет пользователям отправлять сообщения друг другу, и я объединяю некоторые из последних сообщений вместе и отправляю их по электронной почте с помощью сельдерея и redis.Django celery task duplication: не удается заблокировать DB?

Каждый раз, когда пользователь отправляет сообщение, я добавляю сообщение в db и запускаю задачу async для объединения сообщений этого пользователя за последние 60 секунд и отправки их по электронной почте.

tasks.pushMessagePool.apply_async(args = (fromUser,), countdown = 60) 

Если пользователь посылает 5 сообщений в течение следующих 60 секунд, то мое предположение, что 5 задача должна быть создана, но только первая задача отправляет электронную почту, а остальные 4 задачи не делать ничего. Я реализовал простой механизм блокировки, чтобы убедиться, что сообщения рассматриваются только один раз и обеспечивают блокировку db.

@shared_task 
def pushMessagePool(fromUser, ignore_result=True): 
    lockCode = randint(0,10**9) 
    data.models.Messages.objects.filter(fromUser = fromUser, locked=False).update(locked=True, lockCode = lockCode) 
    M = data.models.Messages.objects.filter(fromUser = fromUser, lockCode = lockCode) 
    sendEmail(M,lockCode) 

С этой настройкой я по-прежнему получаю случайные (~ 10%) дубликаты. Дубликаты будут срабатывать в пределах 10 мс друг от друга, и у них есть разные блокировки.

Почему этот механизм блокировки не работает? Имеет ли сельдерей ссылку на старый снимок БД? Это не имеет никакого смысла.

ответ

0

Djangojack, вот аналогичная проблема? Но для SQS. Я не уверен, что это относится и к Редису?

При создании SQS очередь необходимо установить по умолчанию Видимости таймаута некоторого времени, что это больше, чем максимальное время вы ожидаете задачи бежать. Это время, когда SQS сделает сообщение невидимым для всех других потребителей после доставки одному потребителю. Я считаю, что по умолчанию - 30 секунд. Итак, если задача занимает более 30 секунд, SQS доставит то же сообщение другому потребителю, потому что он принимает , первый потребитель умер и не выполнил задачу.

Из комментария @ gustavo-ambrozio на this answer.

+0

Таймаут видимости по умолчанию для Redis составляет 1 час, а задача определенно короче часа. Но даже если он скончался в конце задачи, почему бы не заблокировать = True предотвратить будущие задачи для обработки одних и тех же записей? – djangojack