Приложение django позволяет пользователям отправлять сообщения друг другу, и я объединяю некоторые из последних сообщений вместе и отправляю их по электронной почте с помощью сельдерея и redis.Django celery task duplication: не удается заблокировать DB?
Каждый раз, когда пользователь отправляет сообщение, я добавляю сообщение в db и запускаю задачу async для объединения сообщений этого пользователя за последние 60 секунд и отправки их по электронной почте.
tasks.pushMessagePool.apply_async(args = (fromUser,), countdown = 60)
Если пользователь посылает 5 сообщений в течение следующих 60 секунд, то мое предположение, что 5 задача должна быть создана, но только первая задача отправляет электронную почту, а остальные 4 задачи не делать ничего. Я реализовал простой механизм блокировки, чтобы убедиться, что сообщения рассматриваются только один раз и обеспечивают блокировку db.
@shared_task
def pushMessagePool(fromUser, ignore_result=True):
lockCode = randint(0,10**9)
data.models.Messages.objects.filter(fromUser = fromUser, locked=False).update(locked=True, lockCode = lockCode)
M = data.models.Messages.objects.filter(fromUser = fromUser, lockCode = lockCode)
sendEmail(M,lockCode)
С этой настройкой я по-прежнему получаю случайные (~ 10%) дубликаты. Дубликаты будут срабатывать в пределах 10 мс друг от друга, и у них есть разные блокировки.
Почему этот механизм блокировки не работает? Имеет ли сельдерей ссылку на старый снимок БД? Это не имеет никакого смысла.
Таймаут видимости по умолчанию для Redis составляет 1 час, а задача определенно короче часа. Но даже если он скончался в конце задачи, почему бы не заблокировать = True предотвратить будущие задачи для обработки одних и тех же записей? – djangojack