我有一个简单的DAG
,有2个PythonOperator
,计划间隔为2分钟:
with DAG(dag_id='example_cron', schedule_interval='*/2 * * * *', start_date=days_ago(2)) as dag:
def task1_func(ti):
print("start task 1")
time.sleep(random.randint(0, 70))
print("end task 1")
def task2_func(ti):
print("start task 2")
time.sleep(random.randint(0, 70))
print("end task 2")
task1 = PythonOperator(task_id='task1', python_callable=task1_func, provide_context=True)
task2 = PythonOperator(task_id='task2', python_callable=task2_func, provide_context=True)
task1 >> task2
DAG
可以运行2分钟以上,这意味着多个DAG
可以并行运行
如何将DAG配置为在上一次运行完成后运行?
您只需要将max_active_runs=1
添加到DAG对象中。
with DAG(..., max_active_runs=1) as dag:
不属于您的问题,但请注意,days_ago(2)
已被弃用,在任何情况下,您都不应将动态日期用于start_date
(请参阅文档(