我正在尝试加载〜1m文件集存储在S3上。运行sc.binaryFiles("s3a://BUCKETNAME/*").count()
我得到WARN TaskSetManager: Stage 0 contains a task of very large size (177 KB). The maximum recommended task size is 100 KB
。接下来是失败的任务
我看到它在此阶段中占128个分区,该分区太低了,请注意,在400K文件存储桶上运行相同的命令时,分区数将更高(〜2k分区),并且操作将成功。<<<。/p>
设置更高的minPartitions
无济于事;设置更高的spark.default.parallelism
也没有帮助。
唯一有效的是创建每个1000个文件的多个较小的RDD,并在它们上运行sc.union
,这种方法的问题是它太慢了。
如何减轻此问题?
更新:继续查看BinaryFileRDD.getPartitions()
中如何解决分区的数量,这使我进入了此代码:
def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) {
val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)
val defaultParallelism = sc.defaultParallelism
val files = listStatus(context).asScala
val totalBytes = files.filterNot(_.isDirectory).map(_.getLen + openCostInBytes).sum
val bytesPerCore = totalBytes / defaultParallelism
val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
super.setMaxSplitSize(maxSplitSize)
}
我遵循计算,但仍然没有意义,我应该得到更大的数字。
因此,我尝试减少config.FILES_MAX_PARTITION_BYTES
配置(spark.files.maxPartitionBytes
) - 这确实增加了分区的数量,并完成了工作完成,但是我仍会收到原始警告(任务大小较小),并且仍然分区的木材比在400k文件集上运行时小。
问题植根于文件的大小:令我惊讶的是,S3中的文件未正确上传,它们的大小比应有的大100倍。这导致setMinPartitions
计算包含大量小文件的拆分。每个拆分本质上是一个逗号分隔的文件路径字符串,由于我们每个分式有很多文件,因此我们得到了一个很长的指令字符串,应将其传达给所有工人。这使网络负担重,并导致整个流量失败。将spark.files.maxPartitionBytes
设置为较低的值解决了问题。