在特定日期的列表上执行气流 DAG 实例(任务)



我想使用 Apache 气流管理几个未来的版本。所有这些版本都是提前知道的,我需要确保一些数据推送不会被遗忘。

问题是那些未来的版本不遵循一个简单的定期时间表,可以用像0 1 23 * *这样的经典 cron 或类似 @monthly 来处理。

它相当2019-08-242019-09-302019-10-20...

除了为所有这些未来版本创建单独的mydag.py文件之外,还有其他方法吗?执行此操作的标准/推荐方法是什么?我是否以错误的方式思考这个问题(我想知道,因为文档和教程更侧重于常规的、周期性的事情(?

我可以想到两种简单的方法来做到这一点

  1. 创建 3-4 个顶级 DAG,每个 DAG 具有特定的start_date = 2019-08-24、2019-09-30... 和schedule_interval='@once'

  2. 创建具有schedule_interval=None的单个顶级 DAG(start_date可以是任何内容(。然后创建一个"触发dag",它使用TriggerDagRunOperator在特定日期有条件地触发您的实际工作流程

显然上面的方法2更好

您可以为 DAG 提供一个@daily计划,然后使用 ShortCircuitOperator 任务启动它,该任务检查执行日期是否与发布日期匹配。如果是,则通过检查,DAG 将运行。否则,它将跳过整个 DAG,并且不会发生任何发布。请参阅在 https://github.com/apache/airflow/blob/1.10.3/airflow/example_dags/example_short_circuit_operator.py 中使用此运算符的示例。

我想它看起来像这样:

RELEASE_DATES = ['2019-08-24', '2019-09-30', '2019-10-20']
dag = DAG(
    dag_id='my_dag',
    schedule_interval='@daily', 
    default_args=default_args,
)
def check_release_date(**context):
    # pass if it's a release day
    return context['ds'] in RELEASE_DATES
skip_if_not_release_date = ShortCircuitOperator(
    task_id='skip_if_not_release_date',
    python_callable=check_release_date,
    dag=dag,
    provide_context=True,
)

如果发布日期可以更改,那么您可能希望使用变量使其更加动态,以使更新变得容易。

def check_release_date(**context):
    release_dates = Variable.get('release_dates', deserialize_json=True)
    return context['ds'] in RELEASE_DATES

此外,如果出于任何原因需要重写发布日期的硬编码列表,则可以将此任务标记为成功以强制 DAG 运行。

最新更新