我刚开始使用Airflow DAG,遇到了该工具的一个奇怪问题。我使用的是带有SequentialExecutor的气流版本2.3.3。
我使用的脚本:
import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
dag_args = {
'owner': 'hao',
'retries': 2,
'retry_delay': datetime.timedelta(minutes=1)
}
with DAG(
dag_id='dependency_experiment',
default_args=dag_args,
description='experiment the dag task denpendency expression',
start_date=datetime.datetime.now(),
schedule_interval='@daily',
dagrun_timeout=datetime.timedelta(seconds=10),
) as dag:
pyOp = PythonOperator(
task_id='pyOp',
python_callable=lambda x: haha * x,
op_kwargs={'x': 10}
)
pyOp
此任务的日志片段:
名称错误:名称"haha"未定义
[2022-07-27,18:19:34EDT]{taskinstance.py:1415}信息-将任务标记为UP_FOR_RETRY。dag_id=dependency_experient,task_id=pyOp,execution_de=20220728T021932,start_date=20220728 T021934,end_date=202 20728 T0219 34
[2022-07-27,18:19:34EDT]{standard_task_runner.py:92}错误-无法执行任务pyOp的作业44(名称"haha"未定义;19405(
[2022-07-27,18:19:34EDT]{local_task_job.py:156}信息-任务已退出,返回代码为1
[2022-07-27,18:19:34EDT]{local_task_job.py:273}信息-根据后续计划检查安排的0个下游任务
问题:我有目的地定义了一个PythonOperator,但它会失败。当我将脚本放在DAG上时,该任务如预期的那样引发了一个异常;然而,该任务的状态始终是skipped.
。我不明白为什么该任务没有按预期显示failed
状态。任何建议都将不胜感激。
这是因为您在dag_args字典中定义了'retries'
和'retry_delay'
。
来自文档:
default_args(可选[Dict](–初始化运算符时用作构造函数关键字参数的默认参数字典请注意,运算符具有相同的钩子,并且位于此处定义的钩子之前,这意味着如果您的dict在运算符的调用default_args中包含'dependens_on_perst':True和'dependent_on_perse':False,则实际值将为False。
当您将'retries'
设置为某个值时,Airflow会认为该任务将在其他时间重试。所以它在UI中显示为跳过。
如果从dag_args
中删除'retries'
和'retry_delay'
,则在尝试启动DAG时,会看到该任务设置为失败。
当我在日志中运行您的代码时,我看到:
INFO - Marking task as UP_FOR_RETRY. dag_id=dependency_experiment, task_id=pyOp, execution_date=20220729T060953, start_date=20220729T060953, end_date=20220729T060953
删除'retries'
和'retry_delay'
后,相同的日志变为:
INFO - Marking task as FAILED. dag_id=dependency_experiment, task_id=pyOp, execution_date=20220729T061031, start_date=20220729T061031, end_date=20220729T061031