2017-01-30 20 views
4

У меня есть API, который возвращает список других API.Периодическая задача сельдерея, выполняющая другие задачи сельдерея, не работает

Мне нужно получить доступ к этим API каждые 15 минут и поместить данные, возвращенные в базу данных.

Ниже приведено то, что я написал в файле celery_worker.py, используя сельдерей и redis. Но все задачи не начинаются.

list_of_APIs = requests.get(the_api_that_returns_list_of_APIs).json() 

CELERYBEAT_SCHEDULE = { 
    'every-15-minute': { 
     'task': 'fetch_data_of_all_APIs', 
     'schedule': timedelta(minutes=15), 
    }, 
} 

@celery.task 
def access_one_API(one_API): 
    return requests.get(one_API).json() 

@celery.task(name='fetch_data_of_all_APIs') 
def fetch_data_of_all_APIs(): 
    for one_API in list_of_APIs: 
      task = access_one_API.delay(one_API) 
      # some codes to put all task.id into a list_of_task_id 

    for task_id in list_of_task_id: 
      # some codes to get the results of all tasks 
      # some codes to put all the results into a database 

fetch_data_of_all_APIs функция должна работать каждые 15 минут, которые предполагается использовать несколько рабочих, чтобы запустить функцию access_one_API

Сервер сельдерей начинается в терминале успешно, но ни fetch_data_of_all_APIs ни access_one_API начинается.

Если я вытаскиваю коды в пределах функции fetch_data_of_all_APIs, то access_one_API может запускаться и выполняться несколькими работниками сельдерея. Но как только я помещаю эти коды в функцию и украшаю ее @celery.task, тогда обе функции не запускаются.

Поэтому я считаю, что это должно иметь какое-то отношение к сельдерею.

Большое спасибо заранее.

+0

Обратите внимание, что вам требуется '@ celery.task()' decorator. Кроме того, вам нужно проверить параметры конфигурации «celery-beat», так как в текущей версии сельдерея используются настройки с нижним регистром. –

ответ

0

Здесь пример настройки периодических задач с подзадачами в сельдерее (я установил 20 секунд для демонстрации). tasks.py:

import celery 
from celery.canvas import subtask 
from celery.result import AsyncResult 
# just for example list of integer values 
list_of_APIs = [1, 2, 3, 4] 


@celery.task(name='access_one_API') 
def access_one_API(api): 
    """ 
    Sum of subtask for demonstration 
    :param int api: 
    :return: int 
    """ 
    return api + api 


@celery.task(name='fetch_data_of_all_APIs') 
def fetch_data_of_all_APIs(list_of_APIs): 
    list_task_ids = [] 

    for api in list_of_APIs: 
     # run of celery subtask and collect id's of subtasks 
     task_id = subtask('access_one_API', args=(api,)).apply_async().id 
     list_task_ids.append(task_id) 

    result_sub_tasks = {} 

    for task_id in list_task_ids: 
     while True: 
      task_result = AsyncResult(task_id) 
      if task_result.status == 'SUCCESS': 
       # if subtask is finish add result and check result of next subtask 
       result_sub_tasks[task_id] = task_result.result 

       break 

    print result_sub_tasks 
    # do something with results of subtasks here... 


app = celery.Celery(
    'tasks', 
    broker='redis://localhost:6379/0', 
    backend='redis://localhost:6379/0', 
) 


app.conf.beat_schedule = { 
    'add-every-20-seconds': { 
     'task': 'fetch_data_of_all_APIs', 
     'schedule': 20.0, 
     # args for fetch_data_of_all_APIs 
     'args': (list_of_APIs,) 
    }, 
} 

Run сельдерей: celery worker -A tasks.app --loglevel=info --beat

след от терминала:

[2017-03-14 10:31:36,361: WARNING/PoolWorker-3] {'929996b3-fc86-4274-b3c3-06c38a6d4edd': 6, 'f44456b4-df93-4a78-9f1d-b2c2d2b05322': 4, '4e44fd57-fbbc-43cd-8616-1eafef559417': 8, '6d943f35-0d74-4319-aa02-30a266aa3cd9': 2} 

Надеется, что это помогает.