Spark中两个大型数据集之间的交叉连接



我有两个大型数据集。第一个数据集包含约1.3亿个条目
第二个数据集包含约40000个条目。数据是从MySQL表中提取的。

我需要交叉加入,但我得到了

java.sql.SQLException: GC overhead limit exceeded

在Scala中实现这一点的最佳技术是什么?

以下是我的代码片段:

val df1 = (spark.read.jdbc(jdbcURL,configurationLoader.mysql_table1,"id",100,100000,40, MySqlConnection.getConnectionProperties))
val df2 = (spark.read.jdbc(jdbcURL,configurationLoader.mysql_table2, MySqlConnection.getConnectionProperties))
val df2Cache = df2.repartition(40).cache()
val crossProduct = df1.join(df2Cache)

df1是较大的数据集,df2是较小的数据集。

130M*40K=52万亿条记录需要52 TB的内存来存储这些数据,如果我们假设每条记录都是1字节,这肯定不是真的。如果它多达64字节(我认为这也是一个非常保守的估计),那么仅存储数据就需要3.32 PB的内存。这是一个很大的数量,所以除非你有一个非常大的集群和非常快速的网络,否则你可能需要重新思考你的算法,让它发挥作用。

也就是说,当您执行两个SQL数据集/数据帧的join时,Spark将用于存储联接结果的分区数由spark.sql.shuffle.partitions属性控制(请参阅此处)。您可能希望将其设置为一个非常大的数字,并将执行器的数量设置为最大的一个。然后,您可能能够将处理运行到底。

此外,您可能需要查看spark.shuffle.minNumPartitionsToHighlyCompress选项;如果将其设置为小于shuffle分区的数量,则可能会获得另一次内存提升。请注意,在最近的Spark版本之前,此选项是一个硬编码常量,设置为2000,因此根据您的环境,您只需要将spark.sql.shuffle.partitions设置为大于2000的数字即可使用它。

同意Vladimir的观点,考虑增加更多分数。

请参阅MapStatus,将spark.sql.shuffle.partitions设置为2001(旧方法)(默认值为200)。

Vladimir在回答中提到的新方法(spark.shuffle.minNumPartitionsToHighlyCompress)。

为什么会发生这种变化?:MapStatus有2000个硬编码的SPARK-24519

它将应用不同的算法来处理

def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
if (uncompressedSizes.length > minPartitionsToUseHighlyCompressMapStatus) {
HighlyCompressedMapStatus(loc, uncompressedSizes)
} else {
new CompressedMapStatus(loc, uncompressedSizes)
}
}

HighlyCompressedMapStatus:

一个MapStatus实现,用于存储块,这些块大于spark.shuffle.correcteBlockThreshold。它存储其他非空块的平均大小,以及位图用于跟踪哪些块是空的。

spark.shuffle.actureBlockThreshold-请参阅此处:当我们在HighlyCompressedMapStatus中压缩shuffle块的大小时,如果它高于此配置,我们将准确记录大小。这有助于防止OOM,避免在获取混洗块时低估混洗块大小。


CompressedMapStatus:

跟踪每个块大小的MapStatus实现。大小对于每个块使用单个字节来表示。

也设置为您的spark-submit

--conf spark.yarn.executor.memoryOverhead=<10% of executor memory>  -- conf spark.shuffle.compress=true --conf spark.shuffle.spill.compress=true 

在这两种情况下,压缩都将使用spark.io.compression.codec

结论:大型任务应该使用HighlyCompressedMapStatus,执行器内存开销可能是执行器内存的10%。

此外,请查看火花存储器调整

将SPARK_EXECUTOR_MEMBORY增加到更高的值,并重新分区到更多分区

最新更新