使用火花优化 S3 到 S3 的传输



我正在学习 spark/scala,并尝试使用 scala 语言尝试以下场景。 场景:将多个文件从一个 S3 存储桶文件夹复制到另一个 S3 存储桶文件夹。

到目前为止完成的事情:
1)使用 AWS S3 开发工具包和 scala: - 从 S3 源位置创建文件列表。 - 循环访问列表,传递步骤 1 中的源和目标 S3 位置,并使用 S3 APIcopyObject将每个文件复制到目标位置(已配置)。 这行得通。

但是,我试图了解如果我在多个文件夹中有大量文件,这是最有效的方法,还是我可以使用 Spark 并行化此文件副本?

我正在考虑的方法是:
1)使用 S3 开发工具包获取类似于上面解释的源路径
2) 使用 sc.parallelize() 为每个文件创建一个 RDD - 这些行上的内容?sc.parallelize(objs.getObjectSummaries.map(_.getKey).toList) .flatMap { key => Source.fromInputStream(s3.getObject(bucket, key).getObjectContent: InputStream).getLines }
3) 我可以以某种方式使用 sc.wholeTextFiles 来完成这项工作吗? 到目前为止,我不确定如何实现这一目标。

你能帮我理解我是否在朝着正确的方向思考吗,这种方法是否正确?

谢谢

我认为AWS并没有让它变得复杂。

我们遇到了同样的问题,我们在接近 2 分钟的时间内传输了大约 10TB。

如果要从一个存储桶传输到另一个存储桶,最好使用内置功能在 s3 本身中进行传输。

https://docs.aws.amazon.com/cli/latest/reference/s3/sync.html

AWS CLI 命令示例:

AWS S3 同步 S3://

Sourcebucket S3://Destinationbucket

如果要以编程方式执行此操作,可以使用所有SDK来调用相同类型的命令。我会避免重新发明同一个轮子。

希望对您有所帮助。

我剪了一个代码,cloudCp,它使用spark进行高性能并行上传;这类似于为复制做一些事情,在那里你会放到AWS库进行该操作。

但是:您可能不需要将工作推送到许多计算机,因为每个 PUT/x-copy-source 调用可能很慢,但它不会占用任何带宽。你可以启动一个包含许多线程和大型HTTP客户端池的进程,然后在该进程中运行它们。获取列表,首先按最大的几个排序,然后随机随机打乱其余部分以减少限制效果。打印计数器以帮助配置文件...

我编写此代码以递归方式并行同步许多文件夹。列出某个文件夹或存储桶中的所有上层对象,准备一个包含命令列表的数据集,然后执行一个命令。请参阅代码中的注释。

在 50 个执行器(4 个内核,6 克)上执行,1 分钟内执行 1Tb。 我没有在单独的命令中并行复制每个对象的原因是,我有许多文件夹,每个文件夹中的文件太多,并且我不需要太多的并行izm来保持集群的活动状态以执行其他任务,因此我使用aws s3 sync递归同步文件夹而不是每个文件。 如果您只需要复制所有文件而不是同步,请使用aws s3 cp,它的工作速度比sync.如果已复制某些数据,并且您只需要复制缺少的对象,请使用sync

import collection.JavaConversions._ 
import com.amazonaws.services.s3.AmazonS3ClientBuilder
import com.amazonaws.services.s3.model.ListObjectsV2Request
import com.amazonaws.services.s3.model.ListObjectsV2Result
import sys.process._
val srcBucket = "source_bucket_name"
val tgtBucket = "target_bucket_name"
val prefix = "folder/" //set to empty if you need to copy all bucket content

def listObjects(str: String): List[String] = {
val s3_client = AmazonS3ClientBuilder.standard().build()
val request: ListObjectsV2Request = new ListObjectsV2Request().withBucketName(srcBucket).withDelimiter("/").withPrefix(str)
val result = s3_client.listObjectsV2(request)
val directories = result.getCommonPrefixes()
directories.toList
}
//Function to be used in map() to construct cp or sync command for each DF element 
def getCpString(key: String, srcBucket: String, tgtBucket: String): String = {
s"aws s3 sync s3://$srcBucket/$key s3://$tgtBucket/$key --size-only"
}
val directories = listObjects(prefix)
val ds=directories.toDS.repartition(50).map(row => getCpString(row, srcBucket, tgtBucket))
//filter ds if necessary like this .filter(not($"value".contains("some_folder/")))
//check command list
ds.show(500, false)

//Now execute commands in parallel
ds.rdd.foreachPartition(partition => {
partition.foreach(command=>{
//read stderr and stdout 
//to make sure 
//the command will be not locked
//when stdout/stderr buffer is full
val status = s"$command" ! ProcessLogger((o: String) => println(command + " out " + o),(e: String) => println(command + " error " + e))
})
})

最新更新