我有一个DAG,其中包含三个bash任务,计划每天运行。
我想在所有 bash 脚本中访问 dag 实例(可能是 PID)的唯一 ID。
有什么办法可以做到这一点吗?
我正在寻找与Oozie类似的功能,我们可以在其中访问工作流xml或java代码中的WORKFLOW_ID。
有人可以指出我有关"如何在AirFlow DAG中使用内置和自定义变量"的AirFlow文档吗?
非常感谢帕里
对象的属性可以在 jinja2 中使用点表示法访问(请参阅 https://airflow.apache.org/code.html#macros)。在这种情况下,它只是:
{{ dag.dag_id }}
我利用了 Python 对象dag
打印出当前 DAG 的名称这一事实。 所以我只使用 Jinja2 来更改dag
名称:
{{ dag | replace( '<DAG: ', '' ) | replace( '>', '' ) }}
有点黑客,但它有效。
因此
clear_upstream = BashOperator( task_id='clear_upstream',
trigger_rule='all_failed',
bash_command="""
echo airflow clear -t upstream_task -c -d -s {{ ts }} -e {{ ts }} {{ dag | replace( '<DAG: ', '' ) | replace( '>', '' ) }}
"""
)