创建一个传感器,检查在指定的GCS位置匹配通配符的对象



我需要创建一个使用通配符在GCS桶中感知对象的日期。例如,任务应该每隔1小时检查一个文件是否存在于指定的GCS位置,如果文件匹配提供的通配符,它应该触发下一个任务,否则它应该被标记为时间表,并在1小时后再次检查。

通配符在gcs钩子和传感器中是不支持的,要做到这一点,你可以创建一个自定义的传感器,它使用钩子airflow.providers.google.cloud.hooks.gcs.GCSHook列出所有文件的前缀,然后使用regix检查是否有一个文件路径匹配提供的通配符。

from re import match
from airflow.providers.google.cloud.hooks.gcs import GCSHook
class CustomGcsSensor(BaseSensorOperator):
...
def poke(self, context):
prefix = "some/constant/prefix/" # used to reduce the list size, you can skip it if you have regex in all the prefix
wildcard = ".*/images/.*.jpg" # */images/*.jpg
gcs_hook = GCSHook(...) # use a connection
files = gcs_hook.list(bucket_name=..., prefix=prefix) # for some use cases you can use a delimiter like delimiter='.jpg'
matched_files = list(filter(lambda file_path: match(wildcard, file_path.replace(prefix, "")), files))
return len(matched_files) > 0

最新更新