我正在处理spark数据帧上的Json文件。我正在尝试用以下Json字符串解析文件:
{"id":"00010005","时间值":864359000,"速度":1079,"加速度":19,"la":36.1433530,"lo":-11.51577690}{"id":"00010005","时间值":864360000,"速度":1176,"加速度":10,"la":36.1432660,"lo":-11.51578220}{"id":"00010005","时间值":864361000,"速度":1175,"加速度":,"la":36.1431730,"lo":-11.51578840}{"id":"00010005","时间值":864362000,"速度":1174,"加速度":,"la":36.1430780,"lo":-11.51579410}{"id":"00010005","时间值":864363000,"速度":1285,"加速度":11,"la":36.1429890,"lo":-11.51580110}
这里的加速度字段有时不包含任何值。Spark将那些json标记为Corrupt_record,它们没有加速度值。
val df = sqlContext.read.json(data)
scala> df.show(20)
+--------------------+------------+--------+---------+-----------+-----+----------+
| _corrupt_record|acceleration| id| la| lo|speed|time_value|
+--------------------+------------+--------+---------+-----------+-----+----------+
| null| -1|00010005|36.143418|-11.5157712| 887| 864358000|
| null| 19|00010005|36.143353|-11.5157769| 1079| 864359000|
| null| 10|00010005|36.143266|-11.5157822| 1176| 864360000|
|{"id":"00010005",...| null| null| null| null| null| null|
|{"id":"00010005",...| null| null| null| null| null| null|
我不想丢掉这些唱片。阅读这些Json记录的正确方式是什么?
我尝试了以下代码,并将"加速度"替换为"0"值。但它并不是处理任何字段的值都可能丢失的情况的通用解决方案。
val df1 = df.select("_corrupt_record").na.drop()
val stripRdd = df1.rdd.map( x => x.getString(0)).map(x=>x.replace(""""acceleration":""",""""acceleration":0"""))
val newDf = sqlContext.read.json(stripRdd)
val trimDf = df.drop("_corrupt_record").na.drop
val finalDf = trimDf.unionAll(newDf)
如果你有一个记录的模式,你可以很容易地做到这一点,比如你的模式被称为SpeedRecord,包含字段:加速度、id、la、lo、速度、时间值
case class SpeedRecord(acceleration : Int, id : Long, la : Double , lo : Double, speed : Int, time_value : Long)
val schema = Encoders.bean(classOf[SpeedRecord]).schema
val speedRecord = spark.read.schema(schema).json("/path/data.json")
speedRecord.show()