筛选GCS桶文件夹中的文件,使用Dataflow删除0字节文件



我目前正试图删除谷歌云存储桶内所有0字节的文件。我希望能够做到这一点与apache beam和数据流运行器,将运行上谷歌云项目。我现在所拥有的是这个(我已经隐藏了<***>的一些细节):

import apache_beam as beam
import apache_beam.io.gcp.gcsfilesystem as gcs
from apache_beam.options.pipeline_options import PipelineOptions
class DetectEmpty(beam.DoFn):
def process(self, file_path):
if gfs.size(file_path) == 0:
yield file_path
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--input', dest='input', default=<***>, help='<***>')
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = '<***>'
google_cloud_options.job_name = '<***>'
options.view_as(StandardOptions).runner = 'DataflowRunner'
gfs = gcs.GCSFileSystem(pipeline_options)
p = beam.Pipeline(options=pipeline_options)
images = p | 'read directory' >> ReadFromText(known_args.input)
empty_images = images | 'discover empty files' >> beam.ParDo(DetectEmpty())
p.run()

我的一些问题是:

  • 这是正确的方式去做这个任务吗?
  • 如何将apache_beam.io.gcp.gcsfilesystem.GCSFileSystem传递给DoFn
  • 此外,我想删除所有只包含0字节文件的文件夹。我该怎么做呢?

您不需要为了检测空文件而实际读取文件,您可以直接使用FileSystem对象来检查文件大小并根据需要删除。match()函数返回的FileMetadata对象包括文件的大小。

之类的
class DeleteEmpty(beam.DoFn):
def __init__(self, gfs):
self.gfs = gfs
def process(self, file_metadata):
if file_metadata.size_in_bytes == 0:
gfs.delete([file_metadata.path])
files = p | 'Filenames' >> beam.Create(gfs.match([<directory glob pattern>]).metadata_list)
| 'Reshuffle' >> beam.Reshuffle() # this allows the downstream code to be parallelized after the Create
| 'Delete empty files' >> beam.ParDo(DeleteEmpty(gfs))

GCS没有真正的文件夹;它们只是在使用UI或gsutil时添加的便利。当文件夹中没有对象时,该文件夹就不存在。见https://cloud.google.com/storage/docs/gsutil/addlhelp/HowSubdirectoriesWork

最新更新