Apache Spark writer partitionBy causes OOM



有一个大于700gb的Parquet文件数据集。Parquet由2列组成,每列都有一个JSON文档。我现在想转换这些Parquet文件并将它们保存为分区。阅读,转换和保存。最后,有一个包含分区和相应Parquet文件的新文件夹。

读取源数据是通过spark.read.parquet("/my/folder/**/.parquet")*.转换是通过一些JSON辅助方法在这个数据框架上完成的。完成后,一个包含多个列的数据框可用。除了日期(YYYY-MM-DD)之外,还有其他列,原始数据也仍然可用。对于编写,我执行一个partitionByRange("date", "col1", "col2")sortWithinPartitions("date", "col1")write. partitionby ("date").我的小Spark集群(6个工作人员,4核和2GB内存)现在忙了几个小时。然而,当写作时,总是有一个空间。我的驱动程序(spark-shell)配备了24gb内存,机器不能提供更多内存。单独的文件可以处理得很好,我的问题似乎是数据量。我的猜测是:将工人的部分结果结合起来,可以找到司机的房间。我还尝试了maxRecordsPerFile选项,不幸的是没有成功。还有什么其他的可能性可以避免?

archiveDF
.repartitionByRange($"xxxx", $"startTime",$"uuid") // !!! causes oom !!!
.sortWithinPartitions("xxxx","startTime")
.write
.mode("append")
.option("maxRecordsPerFile", 50000)
.partitionBy("xxxx")
.format("parquet")
.save("/long-term-archive/data-store")

当您使用repartitionByRange(无论如何在Spark 3.2.1中)而不提供所需的分区数量时,Spark使用spark.sql.shuffle.partitions(默认为200)作为您想要最终获得的分区数量。

如果将总数据除以分区数,得到700 GB/200个分区= 3.5 GB/分区。这是非常大的(通常大约100MB的分区是一个好主意),而您的执行器只有0.5GB RAM/core。因此,在您的情况下,您可以尝试使用7000个分区,看看是否会得到这样一个更好的结果。比如:

.repartitionByRange(7000, $"xxxx", $"startTime",$"uuid")

我假设在这些重分区键上没有巨大的数据倾斜。如果有一个大的倾斜,你想要盐你的钥匙。

另一个问题:写的时候重新分区有什么用?重新分区需要shuffle操作,这通常是比较昂贵的操作之一。您应该尽量减少重新分区的次数。

最新更新