Spark把Parquet写到S3,最后一个任务花了很长时间



我正在写一个从DataFrame到S3的parquet文件。当我查看Spark UI时,我可以看到除了我之外的所有任务都在编写阶段迅速完成(例如1999/200)。最后一个任务似乎要花很长时间才能完成,而且通常由于超出执行器内存限制而失败。

我想知道最后这个任务发生了什么。如何优化呢?谢谢。

我已经尝试过Glemmie Helles Sindholt解决方案,效果很好。下面是代码:

path = 's3://...'
n = 2 # number of repartitions, try 2 to test
spark_df = spark_df.repartition(n)
spark_df.write.mode("overwrite").parquet(path)

听起来你的数据有偏差。您可以通过在写入S3之前在DataFrame上调用repartition来解决这个问题。

正如其他人所指出的那样,数据倾斜可能在起作用。

除此之外,我注意到你的任务计数是200

配置参数spark.sql.shuffle.partitions配置用于连接或聚合的数据转移时使用的分区数。

200是此设置的默认值,但通常它远不是最优值。

对于小数据,200可能是多余的,并且您会在多个分区的开销中浪费时间。

对于大数据,200可能导致大分区,应该将其分解为更多、更小的分区。

真正粗略的经验法则是:-有2-3倍数量的分区到cpu。-或~128MB。

2GB是最大分区大小。如果刚好低于2000个分区,那么当分区数量大于2000时,Spark使用不同的数据结构进行随机簿记[1]

private[spark] object MapStatus {
  def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
    if (uncompressedSizes.length > 2000) {
      HighlyCompressedMapStatus(loc, uncompressedSizes)
    } else {
      new CompressedMapStatus(loc, uncompressedSizes)
    }
  }
...

你可以在运行时尝试使用这个参数:

spark.conf.set("spark.sql.shuffle.partitions", "300")

[1] Spark . SQL .shuffle.partitions的最优值应该是什么,或者我们如何在使用Spark SQL时增加分区?

这篇文章- The Bleeding Edge: Spark, Parquet和S3有很多关于Spark, S3和Parquet的有用信息。特别是,它讨论了驱动程序如何最终写出_common_metadata_文件,并且可能会花费相当多的时间。有一种方法可以关闭它。

不幸的是,他们说他们继续自己生成公共元数据,但并没有真正谈论他们是如何做到的。

相关内容

  • 没有找到相关文章

最新更新