SPARK:X阶段X运行SC.BinaryFiles()时包含非常大的任务



我正在尝试加载〜1m文件集存储在S3上。运行sc.binaryFiles("s3a://BUCKETNAME/*").count()

我得到WARN TaskSetManager: Stage 0 contains a task of very large size (177 KB). The maximum recommended task size is 100 KB。接下来是失败的任务

我看到它在此阶段中占128个分区,该分区太低了,请注意,在400K文件存储桶上运行相同的命令时,分区数将更高(〜2k分区),并且操作将成功。<<<。/p>

设置更高的minPartitions无济于事;设置更高的spark.default.parallelism也没有帮助。

唯一有效的是创建每个1000个文件的多个较小的RDD,并在它们上运行sc.union,这种方法的问题是它太慢了。

如何减轻此问题?

更新:继续查看BinaryFileRDD.getPartitions()中如何解决分区的数量,这使我进入了此代码:

  def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) {
    val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
    val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)
    val defaultParallelism = sc.defaultParallelism
    val files = listStatus(context).asScala
    val totalBytes = files.filterNot(_.isDirectory).map(_.getLen + openCostInBytes).sum
    val bytesPerCore = totalBytes / defaultParallelism
    val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
    super.setMaxSplitSize(maxSplitSize)
  }

我遵循计算,但仍然没有意义,我应该得到更大的数字。

因此,我尝试减少config.FILES_MAX_PARTITION_BYTES配置(spark.files.maxPartitionBytes) - 这确实增加了分区的数量,并完成了工作完成,但是我仍会收到原始警告(任务大小较小),并且仍然分区的木材比在400k文件集上运行时小。

问题植根于文件的大小:令我惊讶的是,S3中的文件未正确上传,它们的大小比应有的大100倍。这导致setMinPartitions计算包含大量小文件的拆分。每个拆分本质上是一个逗号分隔的文件路径字符串,由于我们每个分式有很多文件,因此我们得到了一个很长的指令字符串,应将其传达给所有工人。这使网络负担重,并导致整个流量失败。将spark.files.maxPartitionBytes设置为较低的值解决了问题。

相关内容

最新更新