我有一个简单的例子,展示了一个具有两个级别的DAG。运行时,此 dag 失败,因为人为错误,有一个failed
任务和一个upstream_fail
的任务。
bug = True
def process1(param):
print("process 1 running {}".format(param))
if bug and (param == 2):
raise Exception("failure!!")
def process2(param):
print("process 2 running {}".format(param))
with dag:
for i in range(10):
task1 = PythonOperator(
task_id="process_1_{}".format(i),
python_callable=process1,
op_kwargs={'param': i}
)
task2 = PythonOperator(
task_id="process_2_{}".format(i),
python_callable=process2,
op_kwargs={'param': i},
trigger_rule=TriggerRule.ALL_SUCCESS,
retries=2
)
task1 >> task2
现在让我们假设我修复了错误(bug = False
)并尝试清除所有失败的任务:
airflow clear -s 2001 -e 2019 --only_failed test_resubmit
此命令清除任务test_resubmit.process_1_2
,它将成功运行,但其下游(即 test_resubmit.process_2_2
)仍处于upstream_failed
状态。如何触发所有任务在其父状态更改为成功后upstream_failed"重试"?
upstream_failed
状态是结束状态,因此即使现在满足其依赖项也不会重试(与up_for_retry
不同)。您需要传递--downstream
以便也清除失败任务的下游任务。
查看 https://airflow.readthedocs.io/en/stable/cli.html#clear 的所有选项。