我每30分钟运行一次DAG。
假设这是DAG(为了简单起见,使用伪运算符):
dag = DAG(
dag_id='My_dag',
default_args=args,
schedule_interval=timedelta(minutes=30),
max_active_runs=1,
catchup=False,
)
start = DummyOperator(task_id='start_task', dag=dag)
to_do = DummyOperator(task_id='to_do_task ', dag=dag)
end = DummyOperator(task_id='end_task ', dag=dag)
start >> to_do >> end
现在,我想每天添加一次另一个操作员到工作流中,只在当天的第一次运行时执行。
假设它是:
once = DummyOperator(task_id='once_task ', dag=dag)
start >> once
这意味着该CCD_ 1将每24小时执行一次。
我不能用PythonBranchOperator
这样做,因为我不能用这样的东西
if execution_date == midnigt
因为我不知道第一次执行死刑的时间。可以是00:01,也可以是00:17等等。
有没有办法检查这是否是每次执行日期的第一次运行?我听起来像TimeSensor
,但我找不到如何使用文档。有可能戳到相同的DAG吗?
您可以检查上一个执行日期(prev_ds
宏),并将其与BranchPythonOperator
中的当前执行日期(ds
宏)进行比较。示例:
start = DummyOperator(task_id='start_task', dag=dag)
end = DummyOperator(task_id='end_task ', dag=dag)
once = DummyOperator(task_id='once_task', dag=dag)
dummy_task_id_that_does_nothing = DummyOperator(task_id='dummy_task_id_that_does_nothing', dag=dag)
def check_if_task_already_ran(**context):
ds = context.get('ds')
prev_ds = context.get('prev_ds')
print(context)
print(ds)
print(prev_ds)
if prev_ds == ds:
return 'dummy_task_id_that_does_nothing' #task_id
else:
return 'once_task' # Task that would just be executed once in a day
compare_ds = BranchPythonOperator(
task_id='compare_ds',
provide_context=True,
python_callable=check_if_task_already_ran,
dag=dag)
start >> compare_ds
compare_ds >> once >> end
compare_ds >> dummy_task_id_that_does_nothing >> end