我是Spark Echo系统的新手,试图将数据从CSV写入parquet,但因NullPointerException而失败。不知道我错过了什么。
case class PdRecordData(id: String, dates: String, dayOfWeek: String, pdDistrict: String,address: String, longitude: String, latitude: String)
val sqlContext = new SQLContext(sc)
sqlContext.createParquetFile[PdRecordData]("C:\AS\Parquet", true, new Configuration()).registerTempTable("PdRegistry")
val csvFile = sc.textFile("C:\AS\crimeratedata\samplefromorg.csv")
val rowsWithHeader = csvFile.map { x => x.split(",").map { _.trim } }
val maps = rowsWithHeader.map { case Array(id,dates, dayOfWeek, pdDistrict, address, longitude, latitude) => PdRecordData(id,dates, dayOfWeek, pdDistrict, address, longitude, latitude) }
maps.foreach { x => sqlContext.sql("INSERT INTO PdRegistry SELECT " + "'" + x.dates + "','" + x.dayOfWeek + "','" + x.pdDistrict + "','" + x.address + "','" + x.longitude + "','" + x.latitude + "'")}
引发以下异常。
java.lang.NullPointerException
at org.apache.spark.sql.SQLConf$class.getConf(SQLConf.scala:175)
at org.apache.spark.sql.SQLContext.getConf(SQLContext.scala:50)
at org.apache.spark.sql.SQLConf$class.dialect(SQLConf.scala:85)
at org.apache.spark.sql.SQLContext.dialect(SQLContext.scala:50)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:302)
试试这个
val df = sqlContext.createDataFrame(maps)
sqlContext.write.parquet(path)