我目前正在处理11,000个文件。每个文件将生成一个数据框,该数据将与上一个文件结合。以下是代码:
var df1 = sc.parallelize(Array(("temp",100 ))).toDF("key","value").withColumn("Filename", lit("Temp") )
files.foreach( filename => {
val a = filename.getPath.toString()
val m = a.split("/")
val name = m(6)
println("FILENAME: " + name)
if (name == "_SUCCESS") {
println("Cannot Process '_SUCCSS' Filename")
} else {
val freqs=doSomething(a).toDF("key","value").withColumn("Filename", lit(name) )
df1=df1.unionAll(freqs)
}
})
首先,我在11,000个文件上遇到了java.lang.StackOverFlowError
的错误。然后,我在df1=df1.unionAll(freqs)
之后添加以下行:
df1=df1.cache()
它解决了问题,但是在每次迭代之后,它变得越来越慢。有人可以建议我做什么以避免StackOverflowError
而不会减少时间。谢谢!
问题是SPARK将数据框架作为一组转换。它从第一个数据框的" todf"开始,然后在其上执行转换(例如使用column),然后与以前的dataFrame等联合。
。Unionall只是另一种转换,树变得很长(带有11K Unionall,您有一个深度为11K的执行树)。构建信息时的联合商可能会陷入堆栈溢出情况。
缓存无法解决此问题,但是,我想您正在沿途添加一些动作(否则除了构建转换外,没有任何事情可以运行)。当您执行缓存时,Spark可能会跳过一些步骤,因此,堆栈溢出将稍后到达。
您可以返回RDD进行迭代过程(您的示例实际上不是迭代的,但纯粹是平行的,您只需在途中保存每个单独的数据帧,然后转换为RDD并使用RDD Unim)。
由于您的案件似乎正在加入联合一堆没有真正迭代的数据范围,因此您也可以以树的方式进行联盟(即联合对,然后是成对的联合对等)。n)到o(log n)其中n是工会的数量。
最后,您可以从磁盘读取/编写数据框。这个想法是,在每个x(例如20)工会之后,您将进行df1.write.parquet(filex),然后df1 = spark.read.parquet(filex)。当您阅读单个数据框的谱系时,将是文件本身。费用当然是文件的写作和阅读。