气流-如何获得上游失败任务的列表



考虑一个DAG,其中任务ABC有一个下游任务all_success,等待所有上游任务都成功。

a = run_this = BashOperator(task_id='run_after_loop', bash_command='exit 1',)
b = run_this = BashOperator(task_id='run_after_loop', bash_command='exit 0',)
c = run_this = BashOperator(task_id='run_after_loop', bash_command='exit 1',)
all_success = DummyOperator(task_id='all_success', trigger_rule='all_success',)
a >> all_success
b >> all_success
c >> all_success

我可以用什么来替换all_success,它将显示(代码中的stdout(该特定DAG运行失败的所有上游任务的列表?

编辑:我知道我可以在UI中看到它,但我希望在stdout中获得列表。我的用例是,我有一些巨大的DAG,有数百个任务,后面都是一个下游任务。如果任何上游任务失败,很难在UI中滚动找到所有失败的任务。

好的,所以当我想向外部系统报告失败的任务时,我也遇到了同样的问题。经过一个小时左右的思考,这被证明是很简单的——作为Airflow的新手,我仍然混淆了Task和TaskInstance,但无论如何,以下是食谱:


def get_failed_upstream_tasks():
# We need both the current run and the current task instance 
dag_run: DagRun = get_current_context().get('dag_run')
ti: TaskInstance = get_current_context().get('ti')
# This is what took me a while - to figure out that the direct relatives 
# are defined on the Task and not on the TaskInstance. True - for upstream
upstream_tasks: list[BaseOperator] = ti.task.get_direct_relatives(True)
# Making a set of the upstream tasks can come handy when dealing with many upstream tasks
upstream_task_ids = {t.task_id for t in upstream_tasks}
# Then we grab all of the failed task instance in the current run, which will get us tasks that some of might be un-related to this one
failed_task_instances = dag_run.get_task_instances(state='failed')
# That's why we intersect with the direct upstream relatives
failed_upstream_task_ids = [failed_task.task_id for failed_task in failed_task_instances if failed_task.task_id in upstream_task_ids]
# Finally we can stdout it, or do whatever we like to
for failed_task_id in failed_upstream_task_ids:
logging.info(f'The task "{failed_task_id}" has failed :(')

...
report_failed_task = PythonOperator(task_id='report_failed_task',
python_callable=get_failed_upstream_tasks
trigger_rule=TriggerRule.ONE_FAILED)
a >> report_failed_task
b >> report_failed_task
c >> report_failed_task

几个注意事项:

  • 我设置了一个TriggerRule.ONE_FAILED,只有在出现故障时才能激活此任务。然而,它可以调整为始终运行,即使的成功和失败结果喜忧参半

  • 如果有一种更简单的方法,我不会感到惊讶

  • get_current_context().get('task')而不是ti.task也应该工作,但我没有测试

相关内容

最新更新