气流:从单独的文件创建DAG



在airflow中,我试图创建一个专用于在文件中生成DAG的函数:

dynamic_dags.py:
def generate_dag(name):
with DAG(
dag_id=f'dag_{name}',
default_args=args,
start_date=days_ago(2),
schedule_interval='5 5 * * *',
tags=['Test'],
catchup=False
) as dag:
dummy_task=DummyOperator(
task_id="dynamic_dummy_task",
dag=dag
)
return dag

然后在另一个文件中,我试图从一个单独的文件导入dags:

load_dags.py:
from dynamic_dag import generate_dag
globals()["Dynamic_DAG_A"] = generate_dag('A')

然而,dag并没有显示在网络用户界面上。但如果我把它们放在一个文件中,如下代码所示,它就会起作用:

def generate_dag(name):
with DAG(
dag_id=f'dag_{name}',
default_args=args,
start_date=days_ago(2),
schedule_interval='5 5 * * *',
tags=['Test'],
catchup=False
) as dag:
dummy_task=DummyOperator(
task_id="dynamic_dummy_task",
dag=dag
)
return dag
globals()["Dynamic_DAG_A"] = generate_dag('A')

我想知道为什么把它放在两个单独的文件里不起作用。

我认为如果您使用的是Airflow 1.10,那么DAG文件应该包含DAG和airfow:

https://airflow.apache.org/docs/apache-airflow/1.10.15/concepts.html?highlight=airflowignore#dags

在搜索DAG时,Airflow默认情况下只考虑包含字符串"Airflow"one_answers"DAG"的python文件。要考虑所有python文件,请禁用DAG_DISCOVERY_SAFE_MODE配置标志。

在Airflow 2中,它已经更改(轻微-dag不区分大小写(:

https://airflow.apache.org/docs/apache-airflow/2.2.2/concepts/dags.html

在DAG_FOLDER中搜索DAG时,Airflow只将包含字符串Airflow和DAG(不区分大小写(的Python文件视为优化。要考虑所有Python文件,请禁用DAG_DISCOVERY_SAFE_MODE配置标志。

我认为您只是错过了load_dags.py中的"气流"。你可以在任何地方添加它,包括评论。

最新更新