我编写了一些代码来读取parquet文件,稍微切换模式并将数据写入新的parquet文件。代码如下所示:
...
val schema = StructType(
List(
StructField("id", LongType, false),
StructField("data", ArrayType(FloatType), false)
)
)
val data = sqlContext.read.parquet(file.getAbsolutePath)
val revisedData = data.map(r => Row(r.getInt(0).toLong, r.getSeq[Float](1)))
val df = sqlContext.createDataFrame(revisedData, schema)
Writer.writeToParquet(df)
, Writer
为
object Writer {
def writeToParquet(df : DataFrame) : Unit = {
val future = Future {
df.write.mode(SaveMode.Append).save(path)
}
Await.ready(future, Duration.Inf)
}
}
对于大约4 GB的文件,我的程序中断,引发OutOfMemoryError: Java堆空间。我为执行器设置了6 GB内存(使用-Dspark.executor.memory=6g
),提高了JVM堆空间(使用-Xmx6g
),将Kryo序列化器缓冲区增加到2 GB(使用System.setProperty("spark.kryoserializer.buffer.mb", "2048")
)。然而,我仍然得到错误。
这是堆栈跟踪:
java.lang.OutOfMemoryError: Java heap space
at com.esotericsoftware.kryo.io.Output.<init>(Output.java:35)
at org.apache.spark.serializer.KryoSerializer.newKryoOutput(KryoSerializer.scala:76)
at org.apache.spark.serializer.KryoSerializerInstance.output$lzycompute(KryoSerializer.scala:243)
at org.apache.spark.serializer.KryoSerializerInstance.output(KryoSerializer.scala:243)
at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:247)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:236)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:744)
我怎么做才能避免这个错误?
根据我的评论,有两点:
1)你需要注意spark.kryoserializer.buffer.mb
的属性名,在最新的spark中,他们把它改成了spark.kryoserializer.buffer
和spark.kryoserializer.buffer.max
。
2)您必须小心缓冲区的大小和堆大小,它必须足够大,以存储您正在编写的单个记录,但不会更多,因为kryo正在创建该大小的显式byte[]
(并为2GB分配单个byte
数组通常是一个坏主意)。尝试使用适当的属性降低缓冲区大小。
使用sparklyr,有相同的OutOfMemoryError,尽管减少了spark.kryoserializer.buffer,我不能读拼花纸,我能写的文件,我的解决方案是:
禁用"eager"内存加载选项:(memory=FALSE)
spark_read_parquet(sc,name=curName,file.path("file://",srcFile), header=true, memory=FALSE)
tripwire火花sparklyr 1.0.0R version 3.4.2