2016-12-29 3 views
1

Я заранее извиняюсь за то, что не предоставил MVCE - к сожалению, из-за характера этого вопроса, он не поддается минимальным примерам. Я думаю, что он все равно будет нести ответственность без MVCE.Почему эта конструкция группы цепей вызывает исключение в сельдерее?

У меня есть список задач, из которых клиент может выбрать подмножество задач для создания в Flask. Я создаю задачи, как это:

current_app.logger.info("Creating list of chained tasks..") 
chains = [functools.reduce(
    lambda x, y: x | y.s(foo, bar), remaining_tasks, first_task.s(foo, bar) 
) for foo in foos] 

Все задачи имеют аналогичную функцию подписи, которая является чем-то вроде

@celery.task 
def my_task(baz, foo, bar): 
    # ... 
    return baz 

И я пытаюсь выполнить группу следующим образом:

current_app.logger.info("Created a group of chained tasks..") 
g = group(*chains) 
res = g.apply_async(args=(baz,), queue="default") 

Я нахожу, что когда вызывается apply_async, возникают два исключения:

Traceback (most recent call last): 
    File "/Users/erip/.virtualenvs/foo/lib/python3.5/site-packages/celery/utils/functional.py", line 209, in __getitem__ 
    return self.__consumed[index] 
IndexError: list index out of range 

и

File "/Users/erip/Code/whatever.py", line 101, in blahblah 
    res = g.apply_async(args=(baz,), queue="default") 
    File "/Users/erip/.virtualenvs/foo/lib/python3.5/site-packages/celery/canvas.py", line 977, in apply_async 
    app = self.app 
    File "/Users/erip/.virtualenvs/foo/lib/python3.5/site-packages/celery/canvas.py", line 1144, in app 
    app = self.tasks[0].app 
    File "/Users/erip/.virtualenvs/foo/lib/python3.5/site-packages/celery/utils/functional.py", line 213, in __getitem__ 
    self.__consumed.append(next(self.__it)) 
TypeError: 'Signature' object is not an iterator 

docs предположить, что мое построение цепочек действует, так что я не понимаю, почему асинхронная приложение вызывает проблемы.

Моя цель - создать группу цепей, которые применяются асинхронно. Я считаю, что такое поведение происходит только тогда, когда len(foos) == 1.

Кто-нибудь сталкивался с этой проблемой раньше?

ответ

2

Я встречал подобную проблему, сельдерей Документах следующее примечание:

Если только один аргумент передается, и этот аргумент является итерацию
то, что будет использоваться в качестве списка задач вместо : это
позволяет использовать group с генераторными выражениями.

Посмотрите на конструктор класса Group. Если мы передадим только одну подпись для инициализации объекта , эта подпись будет рассматриваться как генератор.

def __init__(self, *tasks, **options):          
    if len(tasks) == 1:              
     tasks = tasks[0]              
     if isinstance(tasks, group):           
      tasks = tasks.tasks            
     if not isinstance(tasks, _regen):          
      tasks = regen(tasks)            
    Signature.__init__(              
     self, 'celery.group',(), {'tasks': tasks}, **options     
    )                   
    self.subtask_type = 'group' 

В вашем случае, вы можете просто выполнить группу задач следующим образом:

current_app.logger.info("Created a group of chained tasks..") 
if len(chains) == 1: 
    g = group(chains) 
else: 
    g = group(*chains) 
res = g.apply_async(args=(baz,), queue="default") 
+0

Это, как я в конце концов решил. Довольно хаки, но это лучший способ, я полагаю. Спасибо за ответ! – erip