好吧,我是 Spark 和 scala 的新手,并且一直在尝试在 Spark 中实现数据清理。 下面的代码检查一列的缺失值并将其存储在 outputrdd 中,并运行循环来计算缺失值。 当文件中只有一个缺失值时,代码运行良好。由于 hdfs 不允许在同一位置再次写入,如果有多个缺失值,它将失败。一旦计算完所有出现的缺失值,您能否协助将 finalrdd 写入特定位置。
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("app").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val files = sc.wholeTextFiles("/input/raw_files/")
val file = files.map { case (filename, content) => filename }
file.collect.foreach(filename => {
cleaningData(filename)
})
def cleaningData(file: String) = {
//headers has column headers of the files
var hdr = headers.toString()
var vl = hdr.split("t")
sqlContext.clearCache()
if (hdr.contains("COLUMN_HEADER")) {
//Checks for missing values in dataframe and stores missing values' in outputrdd
if (!outputrdd.isEmpty()) {
logger.info("value is zero then performing further operation")
val outputdatetimedf = sqlContext.sql("select date,'/t',time from cpc where kwh = 0")
val outputdatetimerdd = outputdatetimedf.rdd
val strings = outputdatetimerdd.map(row => row.mkString).collect()
for (i <- strings) {
if (Coddition check) {
//Calculates missing value and stores in finalrdd
finalrdd.map { x => x.mkString("t") }.saveAsTextFile("/output")
logger.info("file is written in file")
}
}
}
}
}
}''
目前
尚不清楚(Coddition check)
在您的示例中如何工作。在任何情况下,函数.saveAsTextFile("/output")
只应调用一次。
所以我会把你的例子改写成这样:
val strings = outputdatetimerdd
.map(row => row.mkString)
.collect() // perhaps '.collect()' is redundant
val finalrdd = strings
.filter(str => Coddition check str) //don't know how this Coddition works
.map (x => x.mkString("t"))
// this part is called only once but not in a loop
finalrdd.saveAsTextFile("/output")
logger.info("file is written in file")