人们上传文件到s3桶,我需要能够以编程方式压缩某些文件。
我使用的是Fargate,很多时候所有需要压缩的文件总和超过300GB。因此,重要的是要将文件从S3流式传输,并将zip文件流式传输回S3,因为没有足够的磁盘空间或内存来一次保存所有内容。
我在StackOverflow上找到了两个答案,但都没有起作用,我无法找出故障排除后的原因。
第一个是:
from io import RawIOBase
from zipfile import ZipFile
from zipfile import ZipInfo
from zipfile import ZIP_DEFLATED
import boto3
session = boto3.Session(aws_access_key_id='x', aws_secret_access_key='x', region_name='us-east-2')
s3 = boto3.client('s3')
bucket_name = 'x'
class UnseekableStream(RawIOBase):
def __init__(self):
self._buffer = b''
def writable(self):
return True
def write(self, b):
if self.closed:
raise ValueError('The stream was closed!')
self._buffer += b
return len(b)
def get(self):
chunk = self._buffer
self._buffer = b''
return chunk
def zipfile_generator(path, stream):
with ZipFile(stream, mode='w') as zip_archive:
z_info = ZipInfo.from_file(path)
z_info.compress_type = ZIP_DEFLATED
with open(path, 'rb') as entry, zip_archive.open(z_info, mode='w') as dest:
for chunk in iter(lambda: entry.read(16384), b''):
dest.write(chunk)
yield stream.get()
yield stream.get()
items_to_zip = ['file1.jpg', 'file2.jpg', 'file3.jpg']
stream = UnseekableStream()
with open("test.zip", "wb") as f:
for file in items_to_zip:
obj = s3.get_object(Bucket=bucket_name, Key=file)
for i in zipfile_generator(obj.get(obj), stream):
f.write(i)
f.flush()
stream.close()
f.close()
这个给了我一个错误:对于zipfile_generator(obj.get(obj), stream)中的I:TypeError: unhashable type: 'dict'
第二个是:
import boto3
import smart_open
from smart_open import s3
session = boto3.Session()
source_bucket_name = "x"
bucket = session.resource('s3').Bucket(source_bucket_name)
prefix = "xx" # s3 prefix for the files under a "folder"
output_path = "s3://xx/streamedzip.zip"
with smart_open.open(output_path, 'wb') as fout:
for key, content in s3.iter_bucket(source_bucket_name, prefix = prefix):
fout.write(content)
这个将文件上传到S3,但它似乎是一个损坏的zip文件。
我不知道从这里该往哪里走。
谢谢
对于第二种方法,您必须为zip-File使用另一个上下文管理器:
with smart_open.open(output_path, 'wb') as fout:
with zipfile.ZipFile(fout, 'w') as zip:
for key, content in s3.iter_bucket(source_bucket_name, prefix = prefix):
zip.writestr(key, content)