我看到了Airflow如何处理现有DAG文件的一些有趣行为。例如,我有一个DAG文件,如下所示:
# my_pipeline.py
from dag_gen import DagGenerator
g = globals()
g.update(DagGenerator.generate())
在上面的代码中,DagGenerator.generate()
返回一个包含两项的字典:
{
'my_dag': DAG(...),
'my_op': DummyOperator(...),
}
从技术上讲,这个代码应该相当于:
# my_pipeline_equivalent.py
my_dag = DAG(...)
my_op = DummyOperator(...)
由于某些原因,Airflow没有拾取文件my_pipeline.py
,而可以毫无问题地拾取my_pipeline_equivalent.py
。
但是,如果我在my_pipeline.py
中添加以下代码,则可以拾取两个dag(即my_dag
和my_dag2
(。
# my_pipeline.py
from dag_gen import DagGenerator
g = globals()
g.update(DagGenerator.generate())
# newly added
from airflow import DAG
from datetime import datetime
from airflow.operators.dummy_operator import DummyOperator
x = DAG(
'my_dag2',
schedule_interval="@daily",
start_date=datetime(2021, 1, 1),
catchup=False,
default_args={
"retries": 2, # If a task fails, it will retry 2 times.
},
tags=['example'],
)
b = DummyOperator(task_id='d3', dag=x)
让我感到奇怪的是,如果我现在像下面这样评论新添加的部分,my_dag
仍然可以在my_dag2
不在的时候被拿起。
# my_pipeline.py
from dag_gen import DagGenerator
g = globals()
g.update(DagGenerator.generate())
# newly added
#from airflow import DAG
#from datetime import datetime
#from airflow.operators.dummy_operator import DummyOperator
#x = DAG(
# 'my_dag2',
# schedule_interval="@daily",
# start_date=datetime(2021, 1, 1),
# catchup=False,
# default_args={
# "retries": 2, # If a task fails, it will retry 2 times.
# },
# tags=['example'],
#)
#b = DummyOperator(task_id='d3', dag=x)
然而,如果我真的删除了注释的代码,那么my_dag
也就不见了。为了添加回my_dag
,我必须添加回不带注释的my_dag2
代码(注释后的my_dag2
代码以前有效,现在不起作用(。
有人能帮我了解这里发生了什么吗?如果我没记错的话,Airflow有一些逻辑来确定何时/是否导入/重新导入Python文件。有人知道代码中的逻辑在哪里吗?
谢谢。
一些额外的发现。一旦我有两个DAG(my_dag
和my_dag2
(被拾取。如果我将my_pipeline.py
更改为仅保留DAG导入,即使我注释掉了导入,my_dag
仍然可以被拾取,但我无法删除该行。如果我真的删除了它,my_dag
就会再次消失。
# my_pipeline.py
from dag_gen import DagGenerator
g = globals()
g.update(DagGenerator.generate())
# from airflow import DAG
我的猜测是,Airflow一定在读取python文件,并以字符串的形式查找该导入。
如果按照Airflow处理文件的步骤,您将得到以下代码行:
return all(s in content for s in (b'dag', b'airflow'))
这意味着Airflow文件处理器将忽略不包含两个单词dag
和airflow
的文件。
如果你想处理你的模块,并且你已经有了单词dag
,你可以在开头添加一个注释,其中包含单词airflow
:
# airflow
from dag_gen import DagGenerator
g = globals()
g.update(DagGenerator.generate())
或者只是将类DagGenerator
重命名为AirflowDagGenerator
来解决所有模块中的问题。