Airflow如何确定何时重新导入DAG文件



我看到了Airflow如何处理现有DAG文件的一些有趣行为。例如,我有一个DAG文件,如下所示:

# my_pipeline.py
from dag_gen import DagGenerator
g = globals()
g.update(DagGenerator.generate())

在上面的代码中,DagGenerator.generate()返回一个包含两项的字典:

{
'my_dag': DAG(...),
'my_op': DummyOperator(...),
}

从技术上讲,这个代码应该相当于:

# my_pipeline_equivalent.py
my_dag = DAG(...)
my_op = DummyOperator(...)

由于某些原因,Airflow没有拾取文件my_pipeline.py,而可以毫无问题地拾取my_pipeline_equivalent.py

但是,如果我在my_pipeline.py中添加以下代码,则可以拾取两个dag(即my_dagmy_dag2(。

# my_pipeline.py
from dag_gen import DagGenerator
g = globals()
g.update(DagGenerator.generate())
# newly added
from airflow import DAG
from datetime import datetime
from airflow.operators.dummy_operator import DummyOperator
x = DAG(
'my_dag2',
schedule_interval="@daily",
start_date=datetime(2021, 1, 1),
catchup=False,
default_args={
"retries": 2, # If a task fails, it will retry 2 times.
},
tags=['example'],
)
b = DummyOperator(task_id='d3', dag=x)

让我感到奇怪的是,如果我现在像下面这样评论新添加的部分,my_dag仍然可以在my_dag2不在的时候被拿起。

# my_pipeline.py
from dag_gen import DagGenerator
g = globals()
g.update(DagGenerator.generate())
# newly added
#from airflow import DAG
#from datetime import datetime
#from airflow.operators.dummy_operator import DummyOperator
#x = DAG(
#    'my_dag2',
#    schedule_interval="@daily",
#    start_date=datetime(2021, 1, 1),
#    catchup=False,
#    default_args={
#        "retries": 2, # If a task fails, it will retry 2 times.
#    },
#    tags=['example'],
#)
#b = DummyOperator(task_id='d3', dag=x)

然而,如果我真的删除了注释的代码,那么my_dag也就不见了。为了添加回my_dag,我必须添加回不带注释的my_dag2代码(注释后的my_dag2代码以前有效,现在不起作用(。

有人能帮我了解这里发生了什么吗?如果我没记错的话,Airflow有一些逻辑来确定何时/是否导入/重新导入Python文件。有人知道代码中的逻辑在哪里吗?

谢谢。

一些额外的发现。一旦我有两个DAG(my_dagmy_dag2(被拾取。如果我将my_pipeline.py更改为仅保留DAG导入,即使我注释掉了导入,my_dag仍然可以被拾取,但我无法删除该行。如果我真的删除了它,my_dag就会再次消失。

# my_pipeline.py
from dag_gen import DagGenerator
g = globals()
g.update(DagGenerator.generate())
# from airflow import DAG

我的猜测是,Airflow一定在读取python文件,并以字符串的形式查找该导入。

如果按照Airflow处理文件的步骤,您将得到以下代码行:

return all(s in content for s in (b'dag', b'airflow'))

这意味着Airflow文件处理器将忽略不包含两个单词dagairflow的文件。

如果你想处理你的模块,并且你已经有了单词dag,你可以在开头添加一个注释,其中包含单词airflow:

# airflow
from dag_gen import DagGenerator
g = globals()
g.update(DagGenerator.generate())

或者只是将类DagGenerator重命名为AirflowDagGenerator来解决所有模块中的问题。

相关内容

  • 没有找到相关文章

最新更新