我希望 DAG 中的任务在执行下一次运行的第一个任务之前全部完成。
我有 max_active_runs = 1,但这仍然会发生。
default_args = {
'depends_on_past': True,
'wait_for_downstream': True,
'max_active_runs': 1,
'start_date': datetime(2018, 03, 04),
'owner': 't.n',
'email': ['t.n@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=4)
}
dag = DAG('example', default_args=default_args, schedule_interval = schedule_interval)
(我的所有任务都依赖于上一个任务。气流版本为1.8.0(
谢谢
您已将'max_active_runs': 1
放入default_args
参数中,而不是正确的位置。
max_active_runs
是 DAG 的构造函数参数,不应放入default_args
字典中。
下面是一个示例 DAG,其中显示了需要将其移动到的位置:
dag_args = {
'owner': 'Owner',
# 'max_active_runs': 1, # <--- Here is where you had it.
'depends_on_past': False,
'start_date': datetime(2018, 01, 1, 12, 00),
'email_on_failure': False
}
sched = timedelta(hours=1)
dag = DAG(
job_id,
default_args=dag_args,
schedule_interval=sched,
max_active_runs=1 # <---- Here is where it is supposed to be
)
如果您的 dag 正在运行的任务实际上是子 dag,那么您可能也需要将max_active_runs
传递到子 dag,但不能 100% 确定这一点。
我改为将max_active_runs
作为DAG()
的参数而不是default_arguments,它起作用了。
谢谢SimonD给我这个想法,尽管在你的回答中没有直接指出它。
你可以使用 xcoms 来做到这一点。首先将 2 个 python 运算符作为 DAG 的"开始"和"结束"。将流设置为:
开始--->所有任务---->结束
"end"将始终推送一个变量
last_success = 上下文['execution_date']到XCOM (xcom_push(。(在 PythonOperators 中需要 provide_context = true(。
"start"将始终检查xcom(xcom_pull(,以查看是否存在一个last_success变量,其值等于前一个DagRun的execution_date或DAG的start_date(让进程启动(。
遵循此答案
实际上你应该使用 DAG_CONCURRENCY=1 作为环境变量。