Spark:读写Parquet导致OutOfMemoryError: Java堆空间



我编写了一些代码来读取parquet文件,稍微切换模式并将数据写入新的parquet文件。代码如下所示:

...
val schema = StructType(
  List(
    StructField("id", LongType, false),
    StructField("data", ArrayType(FloatType), false)
  )
)
val data = sqlContext.read.parquet(file.getAbsolutePath)
val revisedData = data.map(r =>  Row(r.getInt(0).toLong, r.getSeq[Float](1)))
val df = sqlContext.createDataFrame(revisedData,  schema)
Writer.writeToParquet(df)

Writer

object Writer {
    def writeToParquet(df : DataFrame) : Unit = {
       val future = Future {
         df.write.mode(SaveMode.Append).save(path)
       }
       Await.ready(future, Duration.Inf)
    }
}

对于大约4 GB的文件,我的程序中断,引发OutOfMemoryError: Java堆空间。我为执行器设置了6 GB内存(使用-Dspark.executor.memory=6g),提高了JVM堆空间(使用-Xmx6g),将Kryo序列化器缓冲区增加到2 GB(使用System.setProperty("spark.kryoserializer.buffer.mb", "2048"))。然而,我仍然得到错误。

这是堆栈跟踪:

java.lang.OutOfMemoryError: Java heap space
  at com.esotericsoftware.kryo.io.Output.<init>(Output.java:35)
  at org.apache.spark.serializer.KryoSerializer.newKryoOutput(KryoSerializer.scala:76)
  at org.apache.spark.serializer.KryoSerializerInstance.output$lzycompute(KryoSerializer.scala:243)
  at org.apache.spark.serializer.KryoSerializerInstance.output(KryoSerializer.scala:243)
  at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:247)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:236)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:744)

我怎么做才能避免这个错误?

根据我的评论,有两点:

1)你需要注意spark.kryoserializer.buffer.mb的属性名,在最新的spark中,他们把它改成了spark.kryoserializer.bufferspark.kryoserializer.buffer.max

2)您必须小心缓冲区的大小和堆大小,它必须足够大,以存储您正在编写的单个记录,但不会更多,因为kryo正在创建该大小的显式byte[](并为2GB分配单个byte数组通常是一个坏主意)。尝试使用适当的属性降低缓冲区大小。

使用sparklyr,有相同的OutOfMemoryError,尽管减少了spark.kryoserializer.buffer,我不能读拼花纸,我能写的文件,我的解决方案是:

禁用"eager"内存加载选项:(memory=FALSE)

spark_read_parquet(sc,name=curName,file.path("file://",srcFile), header=true, memory=FALSE)

tripwire火花sparklyr 1.0.0R version 3.4.2

相关内容

  • 没有找到相关文章

最新更新