我正在写一个从DataFrame到S3的parquet文件。当我查看Spark UI时,我可以看到除了我之外的所有任务都在编写阶段迅速完成(例如1999/200)。最后一个任务似乎要花很长时间才能完成,而且通常由于超出执行器内存限制而失败。
我想知道最后这个任务发生了什么。如何优化呢?谢谢。
我已经尝试过Glemmie Helles Sindholt解决方案,效果很好。下面是代码:
path = 's3://...'
n = 2 # number of repartitions, try 2 to test
spark_df = spark_df.repartition(n)
spark_df.write.mode("overwrite").parquet(path)
听起来你的数据有偏差。您可以通过在写入S3之前在DataFrame
上调用repartition
来解决这个问题。
正如其他人所指出的那样,数据倾斜可能在起作用。
除此之外,我注意到你的任务计数是200
。
配置参数spark.sql.shuffle.partitions
配置用于连接或聚合的数据转移时使用的分区数。
200
是此设置的默认值,但通常它远不是最优值。
对于小数据,200可能是多余的,并且您会在多个分区的开销中浪费时间。
对于大数据,200可能导致大分区,应该将其分解为更多、更小的分区。
真正粗略的经验法则是:-有2-3倍数量的分区到cpu。-或~128MB。2GB是最大分区大小。如果刚好低于2000个分区,那么当分区数量大于2000时,Spark使用不同的数据结构进行随机簿记[1]
private[spark] object MapStatus {
def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
if (uncompressedSizes.length > 2000) {
HighlyCompressedMapStatus(loc, uncompressedSizes)
} else {
new CompressedMapStatus(loc, uncompressedSizes)
}
}
...
你可以在运行时尝试使用这个参数:
spark.conf.set("spark.sql.shuffle.partitions", "300")
[1] Spark . SQL .shuffle.partitions的最优值应该是什么,或者我们如何在使用Spark SQL时增加分区?
这篇文章- The Bleeding Edge: Spark, Parquet和S3有很多关于Spark, S3和Parquet的有用信息。特别是,它讨论了驱动程序如何最终写出_common_metadata_文件,并且可能会花费相当多的时间。有一种方法可以关闭它。
不幸的是,他们说他们继续自己生成公共元数据,但并没有真正谈论他们是如何做到的。