2016-08-24 6 views
8

В основном я вижу, что Airflow используется для заданий, связанных с данными ETL/Bid. Я пытаюсь использовать его для бизнес-процессов, когда действие пользователя запускает множество зависимых задач в будущем. Некоторые из этих задач могут быть очищены (удалены) на основе некоторых других действий пользователя. Я думал, что лучший способ справиться с этим - через динамические идентификаторы задач. Я читал, что Airflow поддерживает динамические идентификаторы даг. Итак, я создал простой скрипт python, который принимает идентификатор DAG и идентификатор задачи в качестве параметров командной строки. Тем не менее, у меня проблемы с работой. Это дает ошибке dag_id. Кто-нибудь пробовал это? Вот код для сценария (назовем его tmp.py), который я выполнить в командной строке, как питон (питон tmp.py 820 2016-08-24T22: 50: 00):Динамические DAG и идентификаторы задач потока

from __future__ import print_function 
import os 
import sys 
import shutil 
from datetime import date, datetime, timedelta 
from airflow import DAG 
from airflow.operators.bash_operator import BashOperator 
execution = '2016-08-24T22:20:00' 
if len(sys.argv) > 2 : 
    dagid = sys.argv[1] 
    taskid = 'Activate' + sys.argv[1] 
    execution = sys.argv[2] 
else: 
    dagid = 'DAGObjectId' 
    taskid = 'Activate' 
default_args = {'owner' : 'airflow', 'depends_on_past': False, 'start_date':date.today(), 'email': ['[email protected]'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1} 
dag = DAG(dag_id = dagid, 
     default_args=default_args, 
     schedule_interval='@once', 
    ) 
globals()[dagid] = dag 
task1 = BashOperator(
    task_id = taskid, 
    bash_command='ls -l', 
    dag=dag) 

fakeTask = BashOperator(
    task_id = 'fakeTask', 
    bash_command='sleep 5', 
    retries = 3, 
    dag=dag) 
task1.set_upstream(fakeTask) 

airflowcmd = "airflow run " + dagid + " " + taskid + " " + execution 
print("airflowcmd = " + airflowcmd) 
os.system(airflowcmd) 

ответ

7

После многочисленных проб и ошибок , Я смог понять это. Надеюсь, это поможет кому-то. Вот как это работает: вам нужно иметь итератор или внешний источник (файл/таблицу базы данных), чтобы динамически генерировать dags/task через шаблон. Вы можете сохранить имена дага и задач статическими, просто назначьте их идентификаторы динамически, чтобы отличить один провал от другого. Вы помещаете этот скрипт python в папку dags. Когда вы запускаете планировщик воздушного потока, он запускается через этот скрипт на каждом пульсе и записывает DAG в таблицу dag в базе данных. Если dag (уникальный dag id) уже написан, он просто пропустит его. Планировщик также рассматривает график отдельных DAG, чтобы определить, какой из них готов к исполнению. Если DAG готов к исполнению, он выполняет его и обновляет свой статус. Вот пример кода:

from airflow.operators import PythonOperator 
from airflow.operators import BashOperator 
from airflow.models import DAG 
from datetime import datetime, timedelta 
import sys 
import time 

dagid = 'DA' + str(int(time.time())) 
taskid = 'TA' + str(int(time.time())) 

input_file = '/home/directory/airflow/textfile_for_dagids_and_schedule' 

def my_sleeping_function(random_base): 
    '''This is a function that will run within the DAG execution''' 
    time.sleep(random_base) 

def_args = { 
    'owner': 'airflow', 
    'depends_on_past': False, 
    'start_date': datetime.now(), 'email_on_failure': False,     
    'retries': 1, 'retry_delay': timedelta(minutes=2) 
} 
with open(input_file,'r') as f: 
    for line in f: 
     args = line.strip().split(',') 
    if len(args) < 6: 
     continue 
    dagid = 'DAA' + args[0] 
    taskid = 'TAA' + args[0] 
    yyyy = int(args[1]) 
    mm  = int(args[2]) 
    dd  = int(args[3]) 
    hh  = int(args[4]) 
    mins = int(args[5]) 
    ss  = int(args[6]) 
    dag = DAG(
     dag_id=dagid, default_args=def_args, 
     schedule_interval='@once', start_date=datetime(yyyy,mm,dd,hh,mins,ss) 
     ) 

    myBashTask = BashOperator(
     task_id=taskid, 
     bash_command='python /home/directory/airflow/sendemail.py', 
     dag=dag) 

    task2id = taskid + '-X' 

    task_sleep = PythonOperator(
     task_id=task2id, 
     python_callable=my_sleeping_function, 
     op_kwargs={'random_base': 10}, 
     dag=dag) 

    task_sleep.set_upstream(myBashTask) 

f.close() 
+1

благодарит за обмен кода! возникли ли у вас какие-либо проблемы с выбором времени, так как планировщик будет выполнять все эти шаги на каждом пульсе? – mishkin

+2

@ dean-sha Я бы создал входной файл как задачу восходящего потока. Таким образом, он работает только один раз за работу, а не во время каждого биения. –

+0

@ Мишкин - Это было не слишком тяжело. Всего несколько миллисекунд. Фактически, я заменил его таблицей/запросом базы данных, которая содержит только dags/tasks, которые нужно запустить. –

6

От How can I create DAGs dynamically?:

Airflow смотрит в вас [так в оригинале] DAGS_FOLDER для модулей, содержащих объекты DAG в их глобальном пространстве имен, и добавляет объекты, которые он находит в DagBag , Зная это все, что нам нужно, это способ динамически назначать переменную в глобальном пространстве имен, что легко сделать в python, используя функцию globals() для стандартной библиотеки, которая ведет себя как простой словарь.

for i in range(10): 
    dag_id = 'foo_{}'.format(i) 
    globals()[dag_id] = DAG(dag_id) 
    # or better, call a function that returns a DAG object!