Python:如何在不耗尽内存的情况下提取谷歌云存储中的Zip文件



我需要提取谷歌云存储中zip文件中的文件。我使用python函数来实现这一点,但即使在使用Dask集群时,我也会遇到内存问题,并且每个Dask工作程序都有20GB的内存限制。

我如何优化我的代码,使其不会消耗那么多内存?也许是分块读取zip文件,并将其流式传输到一个临时文件,然后将该文件发送到谷歌云存储?

如有任何指导,不胜感激。

这是我的代码:

@task
def unzip_files(
bucket_name,
zip_data
):
file_date = zip_data['file_date']
gcs_folder_path = zip_data['gcs_folder_path']
gcs_blob_name = zip_data['gcs_blob_name']
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
destination_blob_pathname = f'{gcs_folder_path}/{gcs_blob_name}'
blob = bucket.blob(destination_blob_pathname)
zipbytes = io.BytesIO(blob.download_as_string())
if is_zipfile(zipbytes):
with ZipFile(zipbytes, 'r') as zipObj:
extracted_file_paths = []
for content_file_name in zipObj.namelist():
content_file = zipObj.read(content_file_name)
extracted_file_path = f'{gcs_folder_path}/hgdata_{file_date}_{content_file_name}'
blob = bucket.blob(extracted_file_path)
blob.upload_from_string(content_file)
extracted_file_paths.append(f'gs://{bucket_name}/{extracted_file_path}')
return extracted_file_paths
else:
return []

我不太了解您的代码,但一般来说,dask使用fsspecgcsfs库可以很好地处理像这样的复杂文件操作。例如(你不需要Dask(

import fsspec
with fsspec.open_files("zip://*::gcs://gcs_folder_path/gcs_blob_name") as open_files:
for of in open_files:
with fsspec.open("gcs://{something from fo}", "wb") as f:
data = True
while data:
data = of.read(2**22)
f.write(data)

你可以改为

open_files = fssec.open_files(...)

并使循环与Dask平行。

最新更新