使用匹配模式的气流从GCS中删除对象



我正试图从我的桶中删除某些模式的对象。但它似乎没有像预期的那样起作用。如有任何帮助,不胜感激

delete_data = GCSDeleteObjectsOperator(
bucket_name=BUCKET_NAME,
task_id=f"delete_data",
objects=['test_delete/*/*/*/alpha/data-1-2123-*.json']
)

气流总是抛出404对象未找到的错误。我可以确认在这个模式下桶中有对象

您可以使用GCSHook以获取所需bucket上的对象,然后使用REGEX根据模式过滤对象。

您可以参考下面我在测试中使用的示例代码:

import datetime
import airflow
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.operators import python
import re
bucket_name = 'your-bucket'
object_pattern = 'test_delete/[a-zA-Z0-9_-]+/[a-zA-Z0-9_-]+/[a-zA-Z0-9_-]+/alpha/data-1-2123-[a-zA-Z0-9_-]+.json'
array_objects = []
with airflow.DAG(
'test_delete_dag',
start_date=datetime.datetime(2021, 1, 1),
schedule_interval=None) as dag:

def get_and_delete_obj():
hook = GCSHook()
objects = hook.list(bucket_name=bucket_name)

for obj in objects:
if re.match(object_pattern,obj):
array_objects.append(obj)
print(array_objects)

for arr in array_objects:
hook.delete(bucket_name=bucket_name, object_name=arr)
test_delete = python.PythonOperator(
task_id='delete_gcs_obj',
provide_context=True,
python_callable=get_and_delete_obj,
)
test_delete

查看气流规格。我不确定它是否支持像"*"这样的通配符。

class GCSDeleteObjectsOperator(BaseOperator):
"""
Deletes objects from a Google Cloud Storage bucket, either
from an explicit list of object names or all objects
matching a prefix.
:param bucket_name: The GCS bucket to delete from
:param objects: List of objects to delete. These should be the names
of objects in the bucket, not including gs://bucket/
:param prefix: Prefix of objects to delete. All objects matching this
prefix in the bucket will be deleted.

最新更新