气流 DAG - 失败的任务未按预期显示失败状态



我刚开始使用Airflow DAG,遇到了该工具的一个奇怪问题。我使用的是带有SequentialExecutor的气流版本2.3.3。

我使用的脚本:

import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
dag_args = {
'owner': 'hao',
'retries': 2,
'retry_delay': datetime.timedelta(minutes=1)
}
with DAG(
dag_id='dependency_experiment',
default_args=dag_args,
description='experiment the dag task denpendency expression',
start_date=datetime.datetime.now(),
schedule_interval='@daily',
dagrun_timeout=datetime.timedelta(seconds=10),
) as dag:
pyOp = PythonOperator(
task_id='pyOp',
python_callable=lambda x: haha * x,
op_kwargs={'x': 10}
)
pyOp

此任务的日志片段:

名称错误:名称"haha"未定义

[2022-07-27,18:19:34EDT]{taskinstance.py:1415}信息-将任务标记为UP_FOR_RETRY。dag_id=dependency_experient,task_id=pyOp,execution_de=20220728T021932,start_date=20220728 T021934,end_date=202 20728 T0219 34

[2022-07-27,18:19:34EDT]{standard_task_runner.py:92}错误-无法执行任务pyOp的作业44(名称"haha"未定义;19405(

[2022-07-27,18:19:34EDT]{local_task_job.py:156}信息-任务已退出,返回代码为1

[2022-07-27,18:19:34EDT]{local_task_job.py:273}信息-根据后续计划检查安排的0个下游任务

问题:我有目的地定义了一个PythonOperator,但它会失败。当我将脚本放在DAG上时,该任务如预期的那样引发了一个异常;然而,该任务的状态始终是skipped.。我不明白为什么该任务没有按预期显示failed状态。任何建议都将不胜感激。

这是因为您在dag_args字典中定义了'retries''retry_delay'

来自文档:

default_args(可选[Dict](–初始化运算符时用作构造函数关键字参数的默认参数字典请注意,运算符具有相同的钩子,并且位于此处定义的钩子之前,这意味着如果您的dict在运算符的调用default_args中包含'dependens_on_perst':True和'dependent_on_perse':False,则实际值将为False。

当您将'retries'设置为某个值时,Airflow会认为该任务将在其他时间重试。所以它在UI中显示为跳过。

如果从dag_args中删除'retries''retry_delay',则在尝试启动DAG时,会看到该任务设置为失败。


当我在日志中运行您的代码时,我看到:

INFO - Marking task as UP_FOR_RETRY. dag_id=dependency_experiment, task_id=pyOp, execution_date=20220729T060953, start_date=20220729T060953, end_date=20220729T060953

删除'retries''retry_delay'后,相同的日志变为:

INFO - Marking task as FAILED. dag_id=dependency_experiment, task_id=pyOp, execution_date=20220729T061031, start_date=20220729T061031, end_date=20220729T061031

相关内容

  • 没有找到相关文章

最新更新