气流中的文件依赖关系



我有一个气流DAG,我想在每个月的特定日期触发它。这是一个表迁移任务,所以我不想移动它,除非目录中的表已更新。

如何执行类似 DAG 仅在该目录中存在某些文件时才在当天执行的操作?或者有没有更好的方法可以做到这一点,例如检查文件是否已更新。

请向我建议一些解决方案和方法。

使用气流传感器。它是一种运算符,它将一直运行,直到满足某个标准。您可以使用S3KeySensor等待 S3 存储桶中存在密钥。

https://airflow.readthedocs.io/en/stable/_modules/airflow/sensors/s3_key_sensor.html

您可以将计划间隔设置为类似"0 0 1 * *"的内容 - 这将安排 DAG 每月在每月第一天的午夜运行一次。

> 存在一个名为 File_Sensor 的预定义传感器,它在称为"poke_interval"的特定间隔后检查指定目录中的文件,并在找到文件后运行。

from airflow.contrib.sensors.file_sensor import FileSensor
filesensor = FileSensor(task_id= "my_file_sensor_task", poke_interval= 300, filepath="file path", run_as_user="batchid", dag=dag)

poke_interval以秒为单位。

我们可以在主作业之前将其设置为上游以完成需求。

最新更新