是否有可能在气流中获得先前尝试任务的错误?



我有一个配置为重试3次的任务。如果原始故障的异常属于某种类型,我想执行一些逻辑。例如,是否有可能从任务的第2次运行中提取第一次尝试中的异常?

我不认为有一个现有的方法来读取日志/错误为其他任务或旧的尝试为相同的任务,因为这些数据不保存在元数据

但是如果日志文件保存在本地/远程存储中,您可以读取前一次尝试的日志文件并解析它以获得错误:

previous_try_log_filename = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ ti.try_number }}.log

但您需要检查{{ ti.try_number }} != "1"是否在第一次尝试时跳过此步骤。

您可以使用on_retry_callback,它在任务准备重试时被调用,以便检索每次失败引发的异常并根据您的需要执行一些逻辑。

示例:

import airflow
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def task_retry(context):
ti = context['task_instance']
exception = context['exception']
# your logic using exception and task_instance attributes (ti.task_id, ti.dag_id, ti.operator, ti.log_url...)
# attributes of task_instance are listed here : https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/taskinstance/index.html

def example_function():
raise ValueError('A very specific bad thing happened.')

with airflow.DAG(
dag_id='example_dag',
default_args={
"depends_on_past": False,
"retries": 3,
"retry_delay": timedelta(60),
"start_date": datetime.combine(datetime.today() - timedelta(1), datetime.min.time()),
"on_retry_callback": task_retry,
},
schedule_interval=None
) as dag:
example_task = PythonOperator(
task_id='example_task',
python_callable=example_function,
provide_context=True,
)

最新更新