我已经多次阅读了关于schedule_interval
、start_date
和Airflow文档的多个示例,但我仍然无法理解:
如何在每天的特定时间执行DAG?E.g说现在是9:30(上午(,我部署我的DAG,我希望它在10:30 执行
我试过
with DAG(
"test",
default_args=default_args,
description= "test",
schedule_interval = "0 10 * * *",
start_date = days_ago(0),
tags = ["goodie"]) as dag:
但由于某种原因,今天没有运行。我尝试了不同的start_dates
和start_date = datetime.datetime(2021,6,23)
,但没有执行。
如果我用days_ago(1)
替换days_ago(0)
,它一直落后1天,即今天没有运行,但昨天运行了
难道没有一种简单的方式来说";我现在部署我的DAG,我想用这个cron语法来执行它;(我认为这是大多数人想要的(而不是计算执行时间,基于start_date
、schedule_interval
并弄清楚,如何解释它?
如果我用days_ago(1(替换days_agon(0(,它一直落后于1天
它没有落后。你只是混淆了气流调度机械化和cron作业。在cron作业中,你只需要提供一个cron表达式,它就会相应地进行调度——这不是它在Airflow中的工作方式。
在Airflow中,按start_date
+schedule interval
计算调度。气流在间隔结束时执行作业。这与数据管道通常的工作方式一致。今天您正在处理昨天的数据,因此在当天结束时,您希望启动一个处理昨天记录的过程。
作为一条规则-永远不要使用动态开始日期。
设置:
with DAG(
"test",
default_args=default_args,
description= "test",
schedule_interval = "0 10 * * *",
start_date = datetime(2021,06,23, 10 ,0), # 2021-06-23 10:00
tags = ["goodie"]) as dag:
意味着第一个将在2021-06-24 10:00
上开始,此运行execution_date
将是2021-06-23 10:00
。第二次运行将从2021-06-25 10:00
开始,此运行execution_date
将是2021-06-24 10:00
由于这对许多新用户来说是一个困惑的来源,AIP-39 Richer scheduler_interval正在进行架构更改,它将在何时运行和本次运行要考虑的间隔之间进行分解。它将在Airflow 2.3.0 中提供
更新Airflow>=2.3.0
:AIP-39 Richer scheduler_interval已完成并发布它添加了时间表支持,因此您可以使用时间表自定义DAG调度