我想使用 Apache 气流管理几个未来的版本。所有这些版本都是提前知道的,我需要确保一些数据推送不会被遗忘。
问题是那些未来的版本不遵循一个简单的定期时间表,可以用像0 1 23 * *
这样的经典 cron 或类似 @monthly
来处理。
它相当2019-08-24
,2019-09-30
,2019-10-20
...
除了为所有这些未来版本创建单独的mydag.py
文件之外,还有其他方法吗?执行此操作的标准/推荐方法是什么?我是否以错误的方式思考这个问题(我想知道,因为文档和教程更侧重于常规的、周期性的事情(?
我可以想到两种简单的方法来做到这一点
-
创建 3-4 个顶级 DAG,每个 DAG 具有特定的
start_date
= 2019-08-24、2019-09-30... 和schedule_interval='@once'
-
创建具有
schedule_interval=None
的单个顶级 DAG(start_date
可以是任何内容(。然后创建一个"触发dag",它使用TriggerDagRunOperator
在特定日期有条件地触发您的实际工作流程
显然上面的方法2更好
您可以为 DAG 提供一个@daily
计划,然后使用 ShortCircuitOperator 任务启动它,该任务检查执行日期是否与发布日期匹配。如果是,则通过检查,DAG 将运行。否则,它将跳过整个 DAG,并且不会发生任何发布。请参阅在 https://github.com/apache/airflow/blob/1.10.3/airflow/example_dags/example_short_circuit_operator.py 中使用此运算符的示例。
我想它看起来像这样:
RELEASE_DATES = ['2019-08-24', '2019-09-30', '2019-10-20']
dag = DAG(
dag_id='my_dag',
schedule_interval='@daily',
default_args=default_args,
)
def check_release_date(**context):
# pass if it's a release day
return context['ds'] in RELEASE_DATES
skip_if_not_release_date = ShortCircuitOperator(
task_id='skip_if_not_release_date',
python_callable=check_release_date,
dag=dag,
provide_context=True,
)
如果发布日期可以更改,那么您可能希望使用变量使其更加动态,以使更新变得容易。
def check_release_date(**context):
release_dates = Variable.get('release_dates', deserialize_json=True)
return context['ds'] in RELEASE_DATES
此外,如果出于任何原因需要重写发布日期的硬编码列表,则可以将此任务标记为成功以强制 DAG 运行。