如何在S3中从.tar.gz boto3流式传输和读取



S3上有一个JSON文件,格式如下:

{"field1": "...", "field2": "...", ...}
{"field1": "...", "field2": "...", ...}
{"field1": "...", "field2": "...", ...}

它是压缩的,采用.tar.gz格式,解压缩后的大小约为30GB,因此我想以流媒体的方式阅读它。

使用aws cli,我用以下命令在本地完成了这项工作:

aws s3 cp s3://${BUCKET_NAME}/${FILE_NAME}.tar.gz - | gunzip -c -

但是,我希望在python 3.8中以本机方式执行此操作。

将各种在线解决方案合并,我尝试了以下策略:

1.正在解压缩内存文件[不工作]

import boto3, gzip, json
from io import BytesIO
s3 = boto3.resource('s3')
key = 'FILE_NAME.tar.gz'
streaming_iterator = s3.Object('BUCKET_NAME', key).get()['Body'].iter_lines()
first_line = next(streaming_iterator)
gzipline = BytesIO(first_line)
gzipline = gzip.GzipFile(fileobj=gzipline)
print(gzipline.read())

这提高了

EOFError: Compressed file ended before the end-of-stream marker was reached

2.使用外部库smart_open[部分工作]

import boto3
for line in open(
f's3://${BUCKET_NAME}/${FILE_NAME}.tar.gz',
mode="rb",
transport_params={"client": boto3.client('s3')},
encoding="raw_unicode_escape",
compression=".gz"
):
print(line)

第二种解决方案离散地适用于ASCII字符,但由于某些原因,它也会将非ASCII字符变成垃圾;例如

  • 输入xe5x9bxbexe6xa0x87xe3x80x82
  • 输出:åx9b¾æxa0x87ãx80x82
  • 预期输出图标。

这让我认为我输入的encoding是错误的,但我确实尝试了该页面中的所有编码,只有raw_unicode_escapeunicode_escapepalmos(?(不会导致异常,但它们都会产生垃圾。

欢迎任何建议,提前谢谢。

get_object()的调用返回的是一个StreamingBody对象,顾名思义,它将允许您以流式方式读取对象。但是,boto3不支持在此文件对象上进行查找。

虽然可以将此对象传递给tarfile.open调用,但需要小心。有两个注意事项。首先,您需要告诉tarfile,您正在使用打开字符串中的|字符向它传递一个不可查找的流对象,并且您不能执行任何会触发查找的操作,例如尝试先获取文件列表,然后对这些文件进行操作。

把所有这些放在一起相当简单,你只需要使用boto3打开一个对象,然后依次处理tar文件中的每个文件:

# Use boto3 to read the object from S3
s3 = boto3.client('s3')
resp = s3.get_object(Bucket='example-bucket', Key='path/to/example.tar.gz')
obj = resp['Body']
# Open the tar file, the "|" is important, as it instructs
# tarfile that the fileobj is non-seekable
with tarfile.open(fileobj=obj, mode='r|gz') as tar:
# Enumerate the tar file objects as we extract data
for member in tar:
with tar.extractfile(member) as f:
# Read each row in turn and decode it
for row in f:
row = json.loads(row)
# Just print out the filename and results in this demo
print(member.name, row)

最新更新