有效地缓存大型数据范围



我目前正在处理11,000个文件。每个文件将生成一个数据框,该数据将与上一个文件结合。以下是代码:

var df1 = sc.parallelize(Array(("temp",100 ))).toDF("key","value").withColumn("Filename", lit("Temp") )     
files.foreach( filename => {
             val a = filename.getPath.toString()
             val m = a.split("/")
             val name = m(6)
             println("FILENAME: " + name)                
             if (name == "_SUCCESS") {
               println("Cannot Process '_SUCCSS' Filename")
             } else {
               val freqs=doSomething(a).toDF("key","value").withColumn("Filename", lit(name) )
               df1=df1.unionAll(freqs)
             }
})

首先,我在11,000个文件上遇到了java.lang.StackOverFlowError的错误。然后,我在df1=df1.unionAll(freqs)之后添加以下行:

 df1=df1.cache()

它解决了问题,但是在每次迭代之后,它变得越来越慢。有人可以建议我做什么以避免StackOverflowError而不会减少时间。谢谢!

问题是SPARK将数据框架作为一组转换。它从第一个数据框的" todf"开始,然后在其上执行转换(例如使用column),然后与以前的dataFrame等联合。

Unionall只是另一种转换,树变得很长(带有11K Unionall,您有一个深度为11K的执行树)。构建信息时的联合商可能会陷入堆栈溢出情况。

缓存无法解决此问题,但是,我想您正在沿途添加一些动作(否则除了构建转换外,没有任何事情可以运行)。当您执行缓存时,Spark可能会跳过一些步骤,因此,堆栈溢出将稍后到达。

您可以返回RDD进行迭代过程(您的示例实际上不是迭代的,但纯粹是平行的,您只需在途中保存每个单独的数据帧,然后转换为RDD并使用RDD Unim)。

由于您的案件似乎正在加入联合一堆没有真正迭代的数据范围,因此您也可以以树的方式进行联盟(即联合对,然后是成对的联合对等)。n)到o(log n)其中n是工会的数量。

最后,您可以从磁盘读取/编写数据框。这个想法是,在每个x(例如20)工会之后,您将进行df1.write.parquet(filex),然后df1 = spark.read.parquet(filex)。当您阅读单个数据框的谱系时,将是文件本身。费用当然是文件的写作和阅读。

相关内容

  • 没有找到相关文章

最新更新