我通过AWS上的databricks运行了一个Spark作业,并通过调用
big_old_rdd.saveAsTextFile("path/to/my_file.json")
已将我的作业结果保存到AWS上的S3桶中。该spark命令的结果是一个目录path/to/my_file.json
,其中包含结果的部分:
_SUCCESS
part-00000
part-00001
part-00002
等等。我可以使用AWS CLI通过一个相对简单的命令将这些部件文件复制到我的本地机器上:
aws s3 cp s3://my_bucket/path/to/my_file.json local_dir --recursive
,现在我已经在本地得到了所有的part-*
文件。然后我可以用
cat $(ls part-*) > result.json
问题是这个两阶段的过程很麻烦,并且到处都是文件部分。我想找到一个单一的命令,将下载和合并的文件(理想情况下按顺序)。当直接处理HDFS时,类似于hadoop fs -cat "path/to/my_file.json/*" > result.json
。
我查看了AWS CLI文档,但没有找到自动合并文件部分或删除文件的选项。我对AWS API中的一些奇特的工具或一些bash魔法感兴趣,这些工具将组合上述命令。
注意:通过spark将结果保存到单个文件中是不可行的,因为这需要在作业期间将数据合并到单个分区中。在AWS上拥有多个部件文件是可以的,如果不合适的话。但是当我下载本地副本时,我想要合并。
这可以通过使用boto3
(AWS python SDK)的一个相对简单的函数来完成。
解决方案包括列出给定密钥中的part-*
对象,然后下载每个对象并将其附加到file对象。首先,在bucket my_bucket
中列出path/to/my_file.json
中的部件文件:
import boto3
bucket = boto3.resource('s3').Bucket('my_bucket')
keys = [obj.key for obj in bucket.objects.filter(Prefix='path/to/my_file.json/part-')]
然后,使用Bucket.download_fileobj()
和一个以追加模式打开的文件来写入每个部分。我现在使用的函数是:
from os.path import basename
import boto3
def download_parts(base_object, bucket_name, output_name=None, limit_parts=0):
"""Download all file parts into a single local file"""
base_object = base_object.rstrip('/')
bucket = boto3.resource('s3').Bucket(bucket_name)
prefix = '{}/part-'.format(base_object)
output_name = output_name or basename(base_object)
with open(output_name, 'ab') as outfile:
for i, obj in enumerate(bucket.objects.filter(Prefix=prefix)):
bucket.download_fileobj(obj.key, outfile)
if limit_parts and i >= limit_parts:
print('Terminating download after {} parts.'.format(i))
break
else:
print('Download completed after {} parts.'.format(i))
下载部分可能是额外的一行代码。
就cat
的顺序而言,您可以根据创建的时间或按字母顺序执行。
按时间顺序组合: cat $(ls -t) > outputfile
相结合, cat $(ls part-* | sort) > outputfile
相结合, cat $(ls part-* | sort -r) > outputfile