DAG是否可以在Airflow中检测到特定日期的首次运行



我每30分钟运行一次DAG。

假设这是DAG(为了简单起见,使用伪运算符):

dag = DAG(
dag_id='My_dag',
default_args=args,
schedule_interval=timedelta(minutes=30),
max_active_runs=1,
catchup=False,
)
start = DummyOperator(task_id='start_task', dag=dag)
to_do = DummyOperator(task_id='to_do_task ', dag=dag)
end = DummyOperator(task_id='end_task ', dag=dag)
start >> to_do >> end

现在,我想每天添加一次另一个操作员到工作流中,只在当天的第一次运行时执行。

假设它是:

once = DummyOperator(task_id='once_task ', dag=dag)
start >> once

这意味着该CCD_ 1将每24小时执行一次。

我不能用PythonBranchOperator这样做,因为我不能用这样的东西

if execution_date == midnigt

因为我不知道第一次执行死刑的时间。可以是00:01,也可以是00:17等等。

有没有办法检查这是否是每次执行日期的第一次运行?我听起来像TimeSensor,但我找不到如何使用文档。有可能戳到相同的DAG吗?

您可以检查上一个执行日期(prev_ds宏),并将其与BranchPythonOperator中的当前执行日期(ds宏)进行比较。示例:

start = DummyOperator(task_id='start_task', dag=dag)
end = DummyOperator(task_id='end_task ', dag=dag)
once = DummyOperator(task_id='once_task', dag=dag)
dummy_task_id_that_does_nothing = DummyOperator(task_id='dummy_task_id_that_does_nothing', dag=dag)
def check_if_task_already_ran(**context):
ds = context.get('ds')
prev_ds = context.get('prev_ds')
print(context)
print(ds)
print(prev_ds)
if prev_ds == ds:
return 'dummy_task_id_that_does_nothing' #task_id
else:
return 'once_task'    # Task that would just be executed once in a day

compare_ds = BranchPythonOperator(
task_id='compare_ds',
provide_context=True,
python_callable=check_if_task_already_ran,
dag=dag)

start >> compare_ds
compare_ds >> once >> end
compare_ds >> dummy_task_id_that_does_nothing >> end

最新更新