如何将 Airflow 限制为一次仅运行 DAG 运行的一个实例



我希望 DAG 中的任务在执行下一次运行的第一个任务之前全部完成。

我有 max_active_runs = 1,但这仍然会发生。

default_args = {
    'depends_on_past': True,
    'wait_for_downstream': True,
    'max_active_runs': 1,
    'start_date': datetime(2018, 03, 04),
    'owner': 't.n',
    'email': ['t.n@example.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=4)
}
dag = DAG('example', default_args=default_args, schedule_interval = schedule_interval)

(我的所有任务都依赖于上一个任务。气流版本为1.8.0(

谢谢

您已将'max_active_runs': 1放入default_args参数中,而不是正确的位置。

max_active_runs 是 DAG 的构造函数参数,不应放入default_args字典中。

下面是一个示例 DAG,其中显示了需要将其移动到的位置:

dag_args = { 
    'owner': 'Owner',
    # 'max_active_runs': 1, # <--- Here is where you had it.
    'depends_on_past': False,
    'start_date': datetime(2018, 01, 1, 12, 00),
    'email_on_failure': False
}
sched = timedelta(hours=1)
dag = DAG(
          job_id, 
          default_args=dag_args, 
          schedule_interval=sched, 
          max_active_runs=1 # <---- Here is where it is supposed to be
      ) 

如果您的 dag 正在运行的任务实际上是子 dag,那么您可能也需要将max_active_runs传递到子 dag,但不能 100% 确定这一点。

我改为将max_active_runs作为DAG()的参数而不是default_arguments,它起作用了。

谢谢SimonD给我这个想法,尽管在你的回答中没有直接指出它。

你可以使用 xcoms 来做到这一点。首先将 2 个 python 运算符作为 DAG 的"开始"和"结束"。将流设置为:

开始--->所有任务---->结束

"end"将始终推送一个变量

last_success = 上下文['execution_date']到XCOM (xcom_push(。(在 PythonOperators 中需要 provide_context = true(。

"start"将始终检查xcom(xcom_pull(,以查看是否存在一个last_success变量,其值等于前一个DagRun的execution_date或DAG的start_date(让进程启动(。

遵循此答案

实际上你应该使用 DAG_CONCURRENCY=1 作为环境变量。

最新更新