每天在特定时间运行DAG



我已经多次阅读了关于schedule_intervalstart_date和Airflow文档的多个示例,但我仍然无法理解:

如何在每天的特定时间执行DAG?E.g说现在是9:30(上午(,我部署我的DAG,我希望它在10:30 执行

我试过


with DAG(
"test",
default_args=default_args,
description= "test",
schedule_interval = "0 10 * * *",
start_date = days_ago(0),
tags = ["goodie"]) as dag:

但由于某种原因,今天没有运行。我尝试了不同的start_datesstart_date = datetime.datetime(2021,6,23),但没有执行。

如果我用days_ago(1)替换days_ago(0),它一直落后1天,即今天没有运行,但昨天运行了

难道没有一种简单的方式来说";我现在部署我的DAG,我想用这个cron语法来执行它;(我认为这是大多数人想要的(而不是计算执行时间,基于start_dateschedule_interval并弄清楚,如何解释它?

如果我用days_ago(1(替换days_agon(0(,它一直落后于1天

它没有落后。你只是混淆了气流调度机械化和cron作业。在cron作业中,你只需要提供一个cron表达式,它就会相应地进行调度——这不是它在Airflow中的工作方式。

在Airflow中,按start_date+schedule interval计算调度。气流在间隔结束时执行作业。这与数据管道通常的工作方式一致。今天您正在处理昨天的数据,因此在当天结束时,您希望启动一个处理昨天记录的过程。

作为一条规则-永远不要使用动态开始日期。

设置:

with DAG(
"test",
default_args=default_args,
description= "test",
schedule_interval = "0 10 * * *",
start_date = datetime(2021,06,23, 10 ,0), # 2021-06-23 10:00
tags = ["goodie"]) as dag:

意味着第一个将在2021-06-24 10:00上开始,此运行execution_date将是2021-06-23 10:00。第二次运行将从2021-06-25 10:00开始,此运行execution_date将是2021-06-24 10:00

由于这对许多新用户来说是一个困惑的来源,AIP-39 Richer scheduler_interval正在进行架构更改,它将在何时运行和本次运行要考虑的间隔之间进行分解。它将在Airflow 2.3.0 中提供

更新Airflow>=2.3.0:AIP-39 Richer scheduler_interval已完成并发布它添加了时间表支持,因此您可以使用时间表自定义DAG调度

最新更新