有没有办法在airflow中配置自定义触发规则?
下面是我的问题陈述示例:
假设我有4个任务具有以下沿袭(依赖项(:
[Task1, Task2, Task3] >> Task4
在这里,我只想在Task1
失败和Task2
以及Task3
获得成功时触发Task4
。
我知道气流可能提供的触发规则。但没有得到任何可以处理上述情况的规则。有什么变通办法或解决方案吗?
目前还没有这样做的规则,但有一些简单的解决方案可以实现您的目标:
-
使用规则
all_failed
:在Task1和Task4之间添加EmptyOperator(DummyOperator(任务Task1 >> Empty [Empty, Task2, Task3] >> Task4
在这种情况下,如果Task1是
success
,则不会触发Empty。 -
在Task4之前添加ShortCircuitOperator任务,以检查其他任务的状态:
您可以使用简单的python方法检查这3个任务的状态,然后决定是否触发Task4:def get_state(task_id, **context): return context["dag_run"].get_task_instance(task_id).state ShortCircuitOperator( task_id="check_logic", python_callable=lambda: get_state("Task1") == "failed" and get_state("Task2") == "success" and get_state("Task3") == "success", provide_context=True, )