子任务中的 AirFlow dag id 访问



我有一个DAG,其中包含三个bash任务,计划每天运行。

我想在所有 bash 脚本中访问 dag 实例(可能是 PID)的唯一 ID。

有什么办法可以做到这一点吗?

我正在寻找与Oozie类似的功能,我们可以在其中访问工作流xml或java代码中的WORKFLOW_ID。

有人可以指出我有关"如何在AirFlow DAG中使用内置和自定义变量"的AirFlow文档吗?

非常感谢帕里

对象的属性可以在 jinja2 中使用点表示法访问(请参阅 https://airflow.apache.org/code.html#macros)。在这种情况下,它只是:

{{ dag.dag_id }}

我利用了 Python 对象dag打印出当前 DAG 的名称这一事实。 所以我只使用 Jinja2 来更改dag名称:

{{ dag | replace( '<DAG: ', '' ) | replace( '>', '' ) }}

有点黑客,但它有效。

因此

clear_upstream = BashOperator( task_id='clear_upstream',
    trigger_rule='all_failed',
    bash_command="""
      echo airflow clear -t upstream_task -c -d -s {{ ts }} -e {{ ts }} {{ dag | replace( '<DAG: ', '' ) | replace( '>', '' ) }}
    """
)

最新更新