如何在 Apache Airflow Dag 中添加手动任务



我正在使用Apache Airflow来管理数据处理管道。在管道的中间,在下一步处理之前需要审查一些数据。例如 ... -> task1 -> human review -> task2 -> ... 其中任务 1 和任务 2 是数据处理任务。任务 1 完成后,任务 1 生成的数据需要人工审核。审阅者批准数据后,可以启动任务 2。人工审核任务可能需要很长时间(例如几周(。

我正在考虑使用外部数据库来存储人工审核结果。并使用传感器按时间间隔戳审核结果。但是在审查完成之前,它将占用一个气流工作人员。

知道吗?

根据

Freedom 的答案和 Robert Elliot 的答案,这里有一个完整的工作示例,它让用户在永久失败之前有两周的时间查看第一个任务的结果:

from datetime import timedelta
from airflow.models import DAG
from airflow import AirflowException
from airflow.operators.python_operator import PythonOperator
from my_tasks import first_task_callable, second_task_callable

TIMEOUT = timedelta(days=14)

def task_to_fail():
    raise AirflowException("Please change this step to success to continue")

dag = DAG(dag_id="my_dag")
first_task = PythonOperator(
    dag=dag,
    task_id="first_task",
    python_callable=first_task_callable
)
manual_sign_off = PythonOperator(
    dag=dag,
    task_id="manual_sign_off",
    python_callable=task_to_fail,
    retries=1,
    max_retry_delay=TIMEOUT
)
second_task = PythonOperator(
    dag=dag,
    task_id="second_task",
    python_callable=second_task_callable
)
first_task >> manual_sign_off >> second_task

一位同事建议有一个总是失败的任务,所以手动步骤只是将其标记为成功。我按原样实现它:

def always_fail():
    raise AirflowException('Please change this step to success to continue')

manual_sign_off = PythonOperator(
    task_id='manual_sign_off',
    dag=dag,
    python_callable=always_fail
)
start >> manual_sign_off >> end

你的想法对我来说似乎很好。您可以创建专用 DAG 以使用传感器检查审批流程的进度。如果在传感器上使用低超时,并在此 DAG 上使用适当的计划,请说每 6 小时一次。根据这些任务的批准频率以及您需要执行下游任务的时间进行调整。

在 1.10 之前,我使用了运算符的重试功能来实现ManualSignOffTask。操作员已设置重试次数和retry_delay。因此,任务失败后将被重新安排。计划任务时,它将检查数据库以查看签核是否已完成:如果签核尚未完成,则任务将失败并释放工作人员并等待下一个计划。如果签核已完成,则任务成功,并且 dag 运行将继续。

在 1.10 之后,引入了新的 TI 状态UP_FOR_RESCHEDULE,并且传感器本机支持长时间运行的任务。

最新更新