В основном я вижу, что Airflow используется для заданий, связанных с данными ETL/Bid. Я пытаюсь использовать его для бизнес-процессов, когда действие пользователя запускает множество зависимых задач в будущем. Некоторые из этих задач могут быть очищены (удалены) на основе некоторых других действий пользователя. Я думал, что лучший способ справиться с этим - через динамические идентификаторы задач. Я читал, что Airflow поддерживает динамические идентификаторы даг. Итак, я создал простой скрипт python, который принимает идентификатор DAG и идентификатор задачи в качестве параметров командной строки. Тем не менее, у меня проблемы с работой. Это дает ошибке dag_id. Кто-нибудь пробовал это? Вот код для сценария (назовем его tmp.py), который я выполнить в командной строке, как питон (питон tmp.py 820 2016-08-24T22: 50: 00):Динамические DAG и идентификаторы задач потока
from __future__ import print_function
import os
import sys
import shutil
from datetime import date, datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
execution = '2016-08-24T22:20:00'
if len(sys.argv) > 2 :
dagid = sys.argv[1]
taskid = 'Activate' + sys.argv[1]
execution = sys.argv[2]
else:
dagid = 'DAGObjectId'
taskid = 'Activate'
default_args = {'owner' : 'airflow', 'depends_on_past': False, 'start_date':date.today(), 'email': ['[email protected]'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1}
dag = DAG(dag_id = dagid,
default_args=default_args,
schedule_interval='@once',
)
globals()[dagid] = dag
task1 = BashOperator(
task_id = taskid,
bash_command='ls -l',
dag=dag)
fakeTask = BashOperator(
task_id = 'fakeTask',
bash_command='sleep 5',
retries = 3,
dag=dag)
task1.set_upstream(fakeTask)
airflowcmd = "airflow run " + dagid + " " + taskid + " " + execution
print("airflowcmd = " + airflowcmd)
os.system(airflowcmd)
благодарит за обмен кода! возникли ли у вас какие-либо проблемы с выбором времени, так как планировщик будет выполнять все эти шаги на каждом пульсе? – mishkin
@ dean-sha Я бы создал входной файл как задачу восходящего потока. Таким образом, он работает только один раз за работу, а не во время каждого биения. –
@ Мишкин - Это было не слишком тяжело. Всего несколько миллисекунд. Фактически, я заменил его таблицей/запросом базы данных, которая содержит только dags/tasks, которые нужно запустить. –