AWS 下载所有 Spark "part-*" 文件并将它们合并到单个本地文件中



我通过AWS上的databricks运行了一个Spark作业,并通过调用

big_old_rdd.saveAsTextFile("path/to/my_file.json")

已将我的作业结果保存到AWS上的S3桶中。该spark命令的结果是一个目录path/to/my_file.json,其中包含结果的部分:

_SUCCESS
part-00000
part-00001
part-00002

等等。我可以使用AWS CLI通过一个相对简单的命令将这些部件文件复制到我的本地机器上:

aws s3 cp s3://my_bucket/path/to/my_file.json local_dir --recursive

,现在我已经在本地得到了所有的part-*文件。然后我可以用

得到一个文件
cat $(ls part-*) > result.json

问题是这个两阶段的过程很麻烦,并且到处都是文件部分。我想找到一个单一的命令,将下载和合并的文件(理想情况下按顺序)。当直接处理HDFS时,类似于hadoop fs -cat "path/to/my_file.json/*" > result.json

我查看了AWS CLI文档,但没有找到自动合并文件部分或删除文件的选项。我对AWS API中的一些奇特的工具或一些bash魔法感兴趣,这些工具将组合上述命令。

注意:通过spark将结果保存到单个文件中是不可行的,因为这需要在作业期间将数据合并到单个分区中。在AWS上拥有多个部件文件是可以的,如果不合适的话。但是当我下载本地副本时,我想要合并。

这可以通过使用boto3 (AWS python SDK)的一个相对简单的函数来完成。

解决方案包括列出给定密钥中的part-*对象,然后下载每个对象并将其附加到file对象。首先,在bucket my_bucket中列出path/to/my_file.json中的部件文件:

import boto3
bucket = boto3.resource('s3').Bucket('my_bucket')
keys = [obj.key for obj in bucket.objects.filter(Prefix='path/to/my_file.json/part-')]

然后,使用Bucket.download_fileobj()和一个以追加模式打开的文件来写入每个部分。我现在使用的函数是:

from os.path import basename
import boto3
def download_parts(base_object, bucket_name, output_name=None, limit_parts=0):
    """Download all file parts into a single local file"""
    base_object = base_object.rstrip('/')
    bucket = boto3.resource('s3').Bucket(bucket_name)
    prefix = '{}/part-'.format(base_object)
    output_name = output_name or basename(base_object)
    with open(output_name, 'ab') as outfile:
        for i, obj in enumerate(bucket.objects.filter(Prefix=prefix)):
            bucket.download_fileobj(obj.key, outfile)
            if limit_parts and i >= limit_parts:
                print('Terminating download after {} parts.'.format(i))
                break
        else:
            print('Download completed after {} parts.'.format(i))

下载部分可能是额外的一行代码。

cat的顺序而言,您可以根据创建的时间或按字母顺序执行。

按时间顺序组合: cat $(ls -t) > outputfile

相结合, cat $(ls part-* | sort) > outputfile

相结合, cat $(ls part-* | sort -r) > outputfile

最新更新