如何安排气流DAG在上一次运行DAG结束后运行



我有一个简单的DAG,有2个PythonOperator,计划间隔为2分钟:

with DAG(dag_id='example_cron', schedule_interval='*/2 * * * *', start_date=days_ago(2)) as dag:
def task1_func(ti):
print("start task 1")
time.sleep(random.randint(0, 70))
print("end task 1")
def task2_func(ti):
print("start task 2")
time.sleep(random.randint(0, 70))
print("end task 2")

task1  = PythonOperator(task_id='task1', python_callable=task1_func, provide_context=True)
task2  = PythonOperator(task_id='task2', python_callable=task2_func, provide_context=True)
task1 >> task2
  • DAG可以运行2分钟以上,这意味着多个DAG可以并行运行

如何将DAG配置为在上一次运行完成后运行?

您只需要将max_active_runs=1添加到DAG对象中。

with DAG(..., max_active_runs=1) as dag:

不属于您的问题,但请注意,days_ago(2)已被弃用,在任何情况下,您都不应将动态日期用于start_date(请参阅文档(

最新更新