在AWS S3中分块创建大型zip文件



所以,这个问题最终是关于python和S3的。

假设我有一个S3 Bucket,其中包含以下文件:

file1 --------- 2GB
file2 --------- 3GB
file3 --------- 1.9GB
file4 --------- 5GB

这些文件是使用S3 的预签名帖子URL上传的

我需要做的是让客户端能够在ZIP(或类似的文件(中下载所有文件,但我不能在内存中也不能在服务器存储中下载,因为这是一个无服务器的设置。

根据我的理解,理想情况下服务器需要:

  1. 在S3上启动多部分上传作业
  2. 可能需要向多部分作业发送一个chunk作为zip文件的头
  3. 在某种流中逐块下载bucket中的每个文件,以免溢出存储器
  4. 使用上面的流创建一个zip块并在多部分作业中发送
  5. 完成多部分作业和zip文件

现在,我真的不知道如何实现这一点,也不知道这是否可能,但有些问题是:

  • 如何在S3中分块下载文件?最好使用boto3或botocore
  • 如何在释放内存的同时分块创建zip文件
  • 如何在多部分上传中连接这些

Edit:现在我想一想,也许我甚至不需要把ZIP文件放在S3中,我可以直接流式传输到客户端,对吧?实际上会更好

下面是一些假设的代码,假设我在上面编辑:

#Let's assume Flask
@app.route(/'download_bucket_as_zip'):
def stream_file():
def stream():
#Probably needs to yield zip headers/metadata?
for file in getFilesFromBucket():
for chunk in file.readChunk(4000):
zipchunk = bytesToZipChunk(chunk)
yield zipchunk
return Response(stream(), mimetype='application/zip')

你的问题非常复杂,因为解决它会让你陷入很多兔子洞。

我相信Rahul Iyer走在了正确的道路上,因为IMHO会更容易启动一个新的EC2实例,压缩这个实例上的文件,并将它们移回一个只向客户端提供zip文件的S3存储桶。

如果你的文件较小,你可以使用AWS Cloudfront来处理客户端请求文件时的压缩。

在我的研究过程中,我确实注意到其他语言,如.Net和Java,都有处理zip文件流的API。我还查看了zipstream,它已经分叉了好几次。目前还不清楚如何使用zipstream来流式传输文件以进行压缩。

下面的代码将块化一个文件,并将chuck写入一个zip文件。输入文件接近12Gbs,输出文件几乎为5Gbs。

在测试过程中,我没有发现内存使用方面的任何重大问题或大的峰值。

我确实在下面的一篇文章中添加了一些伪S3代码。我认为还需要更多的测试来了解这些代码是如何在S3中的文件上工作的。

from io import RawIOBase
from zipfile import ZipFile
from zipfile import ZipInfo
from zipfile import ZIP_DEFLATED
# This module is needed for ZIP_DEFLATED
import zlib

class UnseekableStream(RawIOBase):
def __init__(self):
self._buffer = b''
def writable(self):
return True
def write(self, b):
if self.closed:
raise ValueError('The stream was closed!')
self._buffer += b
return len(b)
def get(self):
chunk = self._buffer
self._buffer = b''
return chunk

def zipfile_generator(path, stream):
with ZipFile(stream, mode='w') as zip_archive:
z_info = ZipInfo.from_file(path)
z_info.compress_type = ZIP_DEFLATED
with open(path, 'rb') as entry, zip_archive.open(z_info, mode='w') as dest: 
for chunk in iter(lambda: entry.read(16384), b''): # 16384 is the maximum size of an SSL/TLS buffer.
dest.write(chunk)
yield stream.get()
yield stream.get()

stream = UnseekableStream()
# each on the input files was 4gb
files = ['input.txt', 'input2.txt', 'input3.txt']
with open("test.zip", "wb") as f:
for item in files:
for i in zipfile_generator(item, stream):
f.write(i)
f.flush()
stream.close()
f.close()

伪代码s3/邮政编码

此代码是严格假设的,因为它需要测试

from io import RawIOBase
from zipfile import ZipFile
from zipfile import ZipInfo
from zipfile import ZIP_DEFLATED
import os
import boto3
# This module is needed for ZIP_DEFLATED
import zlib
session = boto3.Session(
aws_access_key_id='XXXXXXXXXXXXXXXXXXXXXXX',
aws_secret_access_key='XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX',
region_name='XXXXXXXXXX')
s3 = session.resource('s3')
bucket_name = s3.Bucket('bucket name')
class UnseekableStream(RawIOBase):
def __init__(self):
self._buffer = b''
def writable(self):
return True
def write(self, b):
if self.closed:
raise ValueError('The stream was closed!')
self._buffer += b
return len(b)
def get(self):
chunk = self._buffer
self._buffer = b''
return chunk

def zipfile_generator(path, stream):
with ZipFile(stream, mode='w') as zip_archive:
z_info = ZipInfo.from_file(path)
z_info.compress_type = ZIP_DEFLATED
with open(path, 'rb') as entry, zip_archive.open(z_info, mode='w') as dest:
for chunk in iter(lambda: entry.read(16384), b''):
dest.write(chunk)
yield stream.get()
yield stream.get()

stream = UnseekableStream()
with open("test.zip", "wb") as f:
for file in bucket_name.objects.all():
obj = s3.get_object(Bucket=bucket_name, Key=file.key)
for i in zipfile_generator(obj.get(), stream):
f.write(i)
f.flush()
stream.close()
f.close()

我需要做的是让客户端能够下载它们都在ZIP(或类似的(中,但我不能在内存中完成,也不能在服务器存储,因为这是一个无服务器的设置。

当你说无服务器时,如果你的意思是你想使用Lambda在S3中创建一个zip文件,你会遇到一些限制:

  • Lambda对函数的执行时间有时间限制
  • 由于Lambda有内存限制,在Lambda函数中组装大文件时可能会遇到问题
  • Lambda对PUT调用的最大大小有限制

由于上述原因,我认为以下方法更好:

  • 当需要文件时,立即创建一个EC2实例。也许您的lambda函数可以触发EC2实例的创建
  • 将所有文件复制到机器的实例存储中,甚至复制到EFS中
  • 将文件压缩到zip中
  • 将文件上传回S3或直接提供文件
  • 杀死EC2实例

在我看来,这将大大简化您必须编写的代码,因为在您的笔记本电脑/台式机上运行的任何代码都可能在EC2实例上运行。您也不会有lambda的时间/空间限制。

一旦zip文件上传回S3,您就可以摆脱EC2实例,因此您不必担心服务器始终运行的成本——只需在需要时启动一个,并在完成后杀死它。

压缩文件夹中多个文件的代码可以简单到:

发件人:https://code.tutsplus.com/tutorials/compressing-and-extracting-files-in-python--cms-26816

import os
import zipfile

fantasy_zip = zipfile.ZipFile('C:\Stories\Fantasy\archive.zip', 'w')

for folder, subfolders, files in os.walk('C:\Stories\Fantasy'):

for file in files:
if file.endswith('.pdf'):
fantasy_zip.write(os.path.join(folder, file), os.path.relpath(os.path.join(folder,file), 'C:\Stories\Fantasy'), compress_type = zipfile.ZIP_DEFLATED)

fantasy_zip.close()
import io

class S3File(io.RawIOBase):
def __init__(self, s3_object):
self.s3_object = s3_object
self.position = 0
def __repr__(self):
return "<%s s3_object=%r>" % (type(self).__name__, self.s3_object)
@property
def size(self):
return self.s3_object.content_length
def tell(self):
return self.position
def seek(self, offset, whence=io.SEEK_SET):
if whence == io.SEEK_SET:
self.position = offset
elif whence == io.SEEK_CUR:
self.position += offset
elif whence == io.SEEK_END:
self.position = self.size + offset
else:
raise ValueError("invalid whence (%r, should be %d, %d, %d)" % (
whence, io.SEEK_SET, io.SEEK_CUR, io.SEEK_END
))
return self.position
def seekable(self):
return True
def read(self, size=-1):
if size == -1:
# Read to the end of the file
range_header = "bytes=%d-" % self.position
self.seek(offset=0, whence=io.SEEK_END)
else:
new_position = self.position + size
# If we're going to read beyond the end of the object, return
# the entire object.
if new_position >= self.size:
return self.read()
range_header = "bytes=%d-%d" % (self.position, new_position - 1)
self.seek(offset=size, whence=io.SEEK_CUR)
return self.s3_object.get(Range=range_header)["Body"].read()
def readable(self):
return True

if __name__ == "__main__":
import zipfile
import boto3
s3 = boto3.resource("s3")
s3_object = s3.Object(bucket_name="bukkit", key="bagit.zip")
s3_file = S3File(s3_object)
with zipfile.ZipFile(s3_file) as zf:
print(zf.namelist())

也许最好使用javascript编写的zip编码器,比如JSZip。或类似node-lz4

最新更新