2015-07-19 6 views
0

Следующий фрагмент кода инициирует задачи запуска в моей установке сельдерея:сельдерея и потерял сообщения

tasks.py:

@app.task(ignore_result=False) 
def asyncTransactionTask(txid): 
    Here I do something with txid and do not schedule additional tasks 

@app.task(ignore_result=True) 
def asyncCheckNotifications(*args): 

    try: 
     payments = # get an array of values 
     payments_tasks = [] 
     for payment in payments: 
      payments_tasks.append(asyncTransactionTask.s(payment)) 

     chain(group(payments_tasks) | asyncCheckNotifications.subtask()).apply_async(countdown=60) 
    except Exception as e: 
     logger.error(str(e)) 
     asyncCheckNotifications.apply_async(countdown=10) 
     raise e 

asyncCheckNotifications.delay() 

Я ожидал увидеть метод asyncCheckNotifications работает примерно каждую минуту, в то время как я получаю их каждые два минут.

Более того, если я проверяю запланированные задачи (celery -A myapp inspect scheduled), я вижу, что выполнение метода запланировано соответствующим образом, но когда я достигаю таймаута, он просто заменяется другим расписанием на следующую минуту, и ничего не запускается.

Я использую сельдерей 3.1.8. Брокер сообщений - RabbitMQ 3.2.4.

ответ

0

Между тем, я решил мою проблему, заменив следующее:

chain(group(payments_tasks) | asyncCheckNotifications.subtask()).apply_async(countdown=60) 

со следующим:

chain(group(payments_tasks) | asyncCheckNotifications.subtask(countdown=60)).delay()