使用DF写时Spark Job挂起



应用exe到sysDF.write.partitionBy,并成功写出第一个parquet文件。但在此之后,应用程序挂起,所有执行器都被杀死,直到超时。ACTION代码如下:

import sqlContext.implicits._
val systemRDD = basicLogRDD.map(basicLog => if (basicLog.isInstanceOf[SystemLog]) basicLog.asInstanceOf[SystemLog] else null).filter(_ != null)
val sysDF = systemRDD.toDF()
sysDF.write.partitionBy("appId").parquet(outputPath + "/system/date=" + dateY4M2D2)
val customRDD = basicLogRDD.map(basicLog => if (basicLog.isInstanceOf[CustomLog]) basicLog.asInstanceOf[CustomLog] else null).filter(_ != null)
val customDF = customRDD.toDF()
customDF.write.partitionBy("appId").parquet(outputPath + "/custom/date=" + dateY4M2D2)
val illegalRDD = basicLogRDD.map(basicLog => if (basicLog.isInstanceOf[IllegalLog]) basicLog.asInstanceOf[IllegalLog] else null).filter(_ != null)
val illegalDF = illegalRDD.toDF()
illegalDF.write.partitionBy("appId").parquet(outputPath + "/illegal/date=" + dateY4M2D2)

首先,映射可以与过滤器组合,这应该会优化查询:

val rdd = basicLogRDD.cache()
rdd.filter(_.isInstanceOf[SystemLog]).write.partitionBy("appId").parquet(outputPath + "/system/date=" + dateY4M2D2)
rdd.filter(_.isInstanceOf[CustomLog]).write.partitionBy("appId").parquet(outputPath + "/custom/date=" + dateY4M2D2)
rdd.filter(_.isInstanceOf[IllegalLog]).write.partitionBy("appId").parquet(outputPath + "/illegal/date=" + dateY4M2D2)

首先,缓存basicLogRDD是一个好主意,因为它被多次使用。.cache()操作符将RDD保存在内存中。其次,不需要显式地将RDD转换为DataFrame,因为它被隐式地转换为DataFrame,允许使用Parquet存储它(您需要定义import sqlContext.implicits._)。

相关内容

  • 没有找到相关文章

最新更新