如何在另一个dag-apache气流中创建dag



我正在尝试一个主dag,它将根据我的需要创建更多的dag。我在airflow.cfg中的dags_folder内有以下python文件。此代码在数据库中创建主dag。这个主dag应该读取一个文本文件,并且应该为文本文件中的每一行创建dag。但是在主dag中创建的dag不会添加到数据库中。创建它的正确方法是什么?

版本详细信息:

Python版本:3.7

Apache气流版本:1.10.8

import datetime as dt
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
root_dir = "/home/user/TestSpace/airflow_check/res"
print("nn ===> n Dag generator")
default_args = {
'owner': 'airflow',
'start_date': dt.datetime(2020, 3, 22, 00, 00, 00),
'concurrency': 1,
'retries': 0
}

def greet(_name):
message = "Greetings {} at UTC: {} Local: {}n".format(_name, dt.datetime.utcnow(), dt.datetime.now())
f = open("{}/greetings.txt".format(root_dir), "a+")
print("nn =====> {}nn".format(message))
f.write(message)
f.close()

def create_dag(dag_name):
with DAG(dag_name, default_args=default_args,
schedule_interval='*/2 * * * *',
catchup=False
) as i_dag:
i_opr_greet = PythonOperator(task_id='greet', python_callable=greet,
op_args=["{}_{}".format("greet", dag_name)])
i_echo_op = BashOperator(task_id='echo', bash_command='echo `date`')
i_opr_greet >> i_echo_op
return i_dag

def create_all_dags():
all_lines = []
f = open("{}/../dag_names.txt".format(root_dir), "r")
for x in f:
all_lines.append(str(x))
f.close()
for line in all_lines:
print("Dag creation for {}".format(line))
globals()[line] = create_dag(line)

with DAG('master_dag', default_args=default_args,
schedule_interval='*/1 * * * *',
catchup=False
) as dag:
echo_op = BashOperator(task_id='echo', bash_command='echo `date`')
create_op = PythonOperator(task_id='create_dag', python_callable=create_all_dags)
echo_op >> create_op

您有两个选项:

  1. 使用SubDagOperator:DAG示例。如果您的日程间隔可以相同,请使用它
  2. 编写Python DAG文件:从掌握DAG开始,在AIRFLOW_HOME中创建包含DAG的Python文件。您可以为此使用Jinja2模板引擎

看看TriggerDagRunOperator:https://airflow.apache.org/docs/stable/_api/airflow/operators/dagrun_operator/index.html

示例用法:

https://github.com/apache/airflow/blob/master/airflow/example_dags/example_trigger_controller_dag.py

最新更新