如何在Airflow中设置自定义触发规则



有没有办法在airflow中配置自定义触发规则?

下面是我的问题陈述示例:

假设我有4个任务具有以下沿袭(依赖项(:

[Task1, Task2, Task3] >> Task4

在这里,我只想在Task1失败Task2以及Task3获得成功时触发Task4

我知道气流可能提供的触发规则。但没有得到任何可以处理上述情况的规则。有什么变通办法或解决方案吗?

目前还没有这样做的规则,但有一些简单的解决方案可以实现您的目标:

  1. 使用规则all_failed:在Task1和Task4之间添加EmptyOperator(DummyOperator(任务

    Task1 >> Empty  
    [Empty, Task2, Task3] >> Task4
    

    在这种情况下,如果Task1是success,则不会触发Empty。

  2. 在Task4之前添加ShortCircuitOperator任务,以检查其他任务的状态:
    您可以使用简单的python方法检查这3个任务的状态,然后决定是否触发Task4:

    def get_state(task_id, **context):
    return context["dag_run"].get_task_instance(task_id).state
    ShortCircuitOperator(
    task_id="check_logic",
    python_callable=lambda: get_state("Task1") == "failed" and get_state("Task2") == "success" and get_state("Task3") == "success",
    provide_context=True,
    )
    

最新更新