当代码运行进行第二次迭代时,无法在 HDFS 中存储结果



好吧,我是 Spark 和 scala 的新手,并且一直在尝试在 Spark 中实现数据清理。 下面的代码检查一列的缺失值并将其存储在 outputrdd 中,并运行循环来计算缺失值。 当文件中只有一个缺失值时,代码运行良好。由于 hdfs 不允许在同一位置再次写入,如果有多个缺失值,它将失败。一旦计算完所有出现的缺失值,您能否协助将 finalrdd 写入特定位置。

def main(args: Array[String]) {
val conf = new SparkConf().setAppName("app").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val files = sc.wholeTextFiles("/input/raw_files/")
val file = files.map { case (filename, content) => filename }
file.collect.foreach(filename => {
  cleaningData(filename)
})
def cleaningData(file: String) = {
  //headers has column headers of the files
  var hdr = headers.toString()
  var vl = hdr.split("t")
  sqlContext.clearCache()
  if (hdr.contains("COLUMN_HEADER")) {
    //Checks for missing values in dataframe and stores missing values' in outputrdd
    if (!outputrdd.isEmpty()) {
      logger.info("value is zero then performing further operation")
      val outputdatetimedf = sqlContext.sql("select date,'/t',time from cpc where kwh = 0")
      val outputdatetimerdd = outputdatetimedf.rdd
      val strings = outputdatetimerdd.map(row => row.mkString).collect()
      for (i <- strings) {
        if (Coddition check) {
            //Calculates missing value and stores in finalrdd
              finalrdd.map { x => x.mkString("t") }.saveAsTextFile("/output")
            logger.info("file is written in file")
          }
        }
      }
    }
}

}''

目前

尚不清楚(Coddition check)在您的示例中如何工作。在任何情况下,函数.saveAsTextFile("/output")只应调用一次。

所以我会把你的例子改写成这样:

val strings = outputdatetimerdd
   .map(row => row.mkString)
   .collect() // perhaps '.collect()' is redundant
val finalrdd = strings
   .filter(str => Coddition check str) //don't know how this Coddition works
   .map (x => x.mkString("t"))
// this part is called only once but not in a loop
finalrdd.saveAsTextFile("/output")
logger.info("file is written in file")

最新更新