按数据流的gcs桶大小列出文件夹



查看这个问题的代码,我希望能够创建一个数据流管道,可以查看特定gcs桶文件夹内的所有文件,并以字节表示具有最大数据量的最终子目录。我写的代码类似于:

class SortFiles(beam.DoFn):
def __init__(self, gfs):
self.gfs = gfs
def process(self, file_metadata):
if file_metadata.size_in_bytes > 0:
# Sort the files here? 

class SortFolders(beam.DoFn):
def __init__(self, gfs):
self.gfs = gfs
def process(self, file_metadata):
if file_metadata.size_in_bytes > 0:
# Sort the folders here based on maximum addition of a combination 
# of the file sizes and file numbers 

def delete_empty_files():
options = PipelineOptions(...)
gfs = gcs.GCSFileSystem(pipeline_options)
p = beam.Pipeline(options=pipeline_options)
discover_empty = p | 'Filenames' >> beam.Create(gfs.match(gs_folder).metadata_list)
| 'Reshuffle' >> beam.Reshuffle() 
| 'SortFilesbySize' >> beam.ParDo(SortFiles(gfs))
| 'SortFoldersbySize' >> beam.ParDo(SortFolders(gfs))
| 'OutputFolders' >> ...

我还没有决定是按总字节数还是按其中的文件总数列出文件夹。我该如何解决这个问题呢?另一个问题在于,我希望能够找到这个任务的最终子目录,而不是它的父文件夹。

GCSFileSystem有一个函数du,它将告诉您特定路径下的大小。https://gcsfs.readthedocs.io/en/latest/api.html?highlight=du gcsfs.core.GCSFileSystem

在阅读你的问题时,我认为你想

  1. 首先查找bucket中不包含目录的所有目录(如果我理解为"最终子目录")
  2. 然后在它们每个上运行du
  3. 然后按大小对结果列表进行排序

如果你想对嵌套的文件进行计数:

  1. 列出所有对象,名称将是a/, a/b.txt, a/b/c.txt等
  2. 写一个函数来计算嵌套在每个子路径下的对象

相关内容

  • 没有找到相关文章

最新更新