我有两个大型数据集。第一个数据集包含约1.3亿个条目
第二个数据集包含约40000个条目。数据是从MySQL表中提取的。
我需要交叉加入,但我得到了
java.sql.SQLException: GC overhead limit exceeded
在Scala中实现这一点的最佳技术是什么?
以下是我的代码片段:
val df1 = (spark.read.jdbc(jdbcURL,configurationLoader.mysql_table1,"id",100,100000,40, MySqlConnection.getConnectionProperties))
val df2 = (spark.read.jdbc(jdbcURL,configurationLoader.mysql_table2, MySqlConnection.getConnectionProperties))
val df2Cache = df2.repartition(40).cache()
val crossProduct = df1.join(df2Cache)
df1是较大的数据集,df2是较小的数据集。
130M*40K=52万亿条记录需要52 TB的内存来存储这些数据,如果我们假设每条记录都是1字节,这肯定不是真的。如果它多达64字节(我认为这也是一个非常保守的估计),那么仅存储数据就需要3.32 PB的内存。这是一个很大的数量,所以除非你有一个非常大的集群和非常快速的网络,否则你可能需要重新思考你的算法,让它发挥作用。
也就是说,当您执行两个SQL数据集/数据帧的join
时,Spark将用于存储联接结果的分区数由spark.sql.shuffle.partitions
属性控制(请参阅此处)。您可能希望将其设置为一个非常大的数字,并将执行器的数量设置为最大的一个。然后,您可能能够将处理运行到底。
此外,您可能需要查看spark.shuffle.minNumPartitionsToHighlyCompress
选项;如果将其设置为小于shuffle分区的数量,则可能会获得另一次内存提升。请注意,在最近的Spark版本之前,此选项是一个硬编码常量,设置为2000,因此根据您的环境,您只需要将spark.sql.shuffle.partitions
设置为大于2000的数字即可使用它。
同意Vladimir的观点,考虑增加更多分数。
请参阅MapStatus,将spark.sql.shuffle.partitions
设置为2001
(旧方法)(默认值为200)。
Vladimir在回答中提到的新方法(spark.shuffle.minNumPartitionsToHighlyCompress
)。
为什么会发生这种变化?:MapStatus有2000个硬编码的SPARK-24519
它将应用不同的算法来处理
def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
if (uncompressedSizes.length > minPartitionsToUseHighlyCompressMapStatus) {
HighlyCompressedMapStatus(loc, uncompressedSizes)
} else {
new CompressedMapStatus(loc, uncompressedSizes)
}
}
HighlyCompressedMapStatus
:
一个MapStatus实现,用于存储块,这些块大于spark.shuffle.correcteBlockThreshold。它存储其他非空块的平均大小,以及位图用于跟踪哪些块是空的。
spark.shuffle.actureBlockThreshold-请参阅此处:当我们在HighlyCompressedMapStatus
中压缩shuffle块的大小时,如果它高于此配置,我们将准确记录大小。这有助于防止OOM,避免在获取混洗块时低估混洗块大小。
CompressedMapStatus
:
跟踪每个块大小的MapStatus实现。大小对于每个块使用单个字节来表示。
也设置为您的spark-submit
--conf spark.yarn.executor.memoryOverhead=<10% of executor memory> -- conf spark.shuffle.compress=true --conf spark.shuffle.spill.compress=true
在这两种情况下,压缩都将使用spark.io.compression.codec
结论:大型任务应该使用
HighlyCompressedMapStatus
,执行器内存开销可能是执行器内存的10%。
此外,请查看火花存储器调整
将SPARK_EXECUTOR_MEMBORY增加到更高的值,并重新分区到更多分区