如何在 Airflow 中将父状态更改为成功后触发所有upstream_failed任务"retry"?



我有一个简单的例子,展示了一个具有两个级别的DAG。运行时,此 dag 失败,因为人为错误,有一个failed任务和一个upstream_fail的任务。

bug = True 
def process1(param):
    print("process 1 running {}".format(param))
    if bug and (param == 2):
        raise Exception("failure!!")

def process2(param):
    print("process 2 running {}".format(param))

with dag:
    for i in range(10):
        task1 = PythonOperator(
            task_id="process_1_{}".format(i),
            python_callable=process1,
            op_kwargs={'param': i}
        )
        task2 = PythonOperator(
            task_id="process_2_{}".format(i),
            python_callable=process2,
            op_kwargs={'param': i},
            trigger_rule=TriggerRule.ALL_SUCCESS,
            retries=2
        )
        task1 >> task2

现在让我们假设我修复了错误(bug = False)并尝试清除所有失败的任务:

airflow clear -s 2001 -e 2019 --only_failed test_resubmit

此命令清除任务test_resubmit.process_1_2,它将成功运行,但其下游(即 test_resubmit.process_2_2 )仍处于upstream_failed状态。如何触发所有任务在其父状态更改为成功后upstream_failed"重试"?

upstream_failed状态是结束状态,因此即使现在满足其依赖项也不会重试(与up_for_retry不同)。您需要传递--downstream以便也清除失败任务的下游任务。

查看 https://airflow.readthedocs.io/en/stable/cli.html#clear 的所有选项。

最新更新