在包含保留字符的 Spark 中解析 JSON



我有一个JSON输入.txt文件,其中包含如下数据:

2018-05-30.txt:{"Message":{"eUuid":"6e7d4890-9279-491a-ae4d-70416ef9d42d","schemaVersion":"1.0-AB1","timestamp":1527539376,"id":"XYZ","location":{"dim":{"x":2,"y":-7},"towards":121.0},"source":"a","UniqueId":"test123","code":"del","signature":"xyz","":{},"vel":{"ground":15},"height":{},"next":{"dim":{}},"sub":"del1"}}
2018-05-30.txt:{"Message":{"eUuid":"5e7d4890-9279-491a-ae4d-70416ef9d42d","schemaVersion":"1.0-AB1","timestamp":1627539376,"id":"ABC","location":{"dim":{"x":1,"y":-8},"towards":132.0},"source":"b","UniqueId":"hello123","code":"fra","signature":"abc","":{},"vel":{"ground":16},"height":{},"next":{"dim":{}},"sub":"fra1"}}
.
.

我尝试将 JSON 加载到数据帧中,如下所示:

>>val df = spark.read.json("<full path of input.txt file>")

我正在接收

_corrupt_record

数据帧

我知道 json 字符包含"."(2018-05-30.txt( 作为导致问题的保留字符。我该如何解决这个问题?

val rdd = sc.textFile("/Users/kishore/abc.json")
val jsonRdd= rdd.map(x=>x.split("txt:")(1))
scala> df.show
+--------------------+
|             Message|
+--------------------+
|[test123,del,6e7d...|
|[hello123,fra,5e7...|
+--------------------+


import org.apache.spark.sql.functions._
import sqlContext.implicits._
//  val df = sqlContext.read.json(jsonRdd)
//  df.show(false)

val df = sqlContext.read.json(jsonRdd).withColumn("eUuid", $"Message"("eUuid"))
.withColumn("schemaVersion", $"Message"("schemaVersion"))
.withColumn("timestamp", $"Message"("timestamp"))
.withColumn("id", $"Message"("id"))
.withColumn("source", $"Message"("source"))
.withColumn("UniqueId", $"Message"("UniqueId"))
.withColumn("location", $"Message"("location"))
.withColumn("dim", $"location"("dim"))
.withColumn("x", $"dim"("x"))
.withColumn("y", $"dim"("y"))
.drop("dim")
.withColumn("vel", $"Message"("vel"))
.withColumn("ground", $"vel"("ground"))
.withColumn("sub", $"Message"("sub"))
.drop("Message")
df.show()
+--------------------+-------------+----------+---+------+--------+------------+---+---+----+------+----+
|               eUuid|schemaVersion| timestamp| id|source|UniqueId|    location|  x|  y| vel|ground| sub|
+--------------------+-------------+----------+---+------+--------+------------+---+---+----+------+----+
|6e7d4890-9279-491...|      1.0-AB1|1527539376|XYZ|     a| test123|[[2,-7],121]|  2| -7|[15]|    15|del1|
+--------------------+-------------+----------+---+------+--------+------------+---+---+----+------+----+

问题不在于保留字符,而在于该文件不包含有效的 JSON 所以你可以

val df=spark.read.textFile(...)
val json=spark.read.json(df.map(v=>v.drop(15)))
json.printSchema()
root
|-- Message: struct (nullable = true)
|    |-- UniqueId: string (nullable = true)
|    |-- code: string (nullable = true)
|    |-- eUuid: string (nullable = true)
|    |-- id: string (nullable = true)
|    |-- location: struct (nullable = true)
|    |    |-- dim: struct (nullable = true)
|    |    |    |-- x: long (nullable = true)
|    |    |    |-- y: long (nullable = true)
|    |    |-- towards: double (nullable = true)
|    |-- schemaVersion: string (nullable = true)
|    |-- signature: string (nullable = true)
|    |-- source: string (nullable = true)
|    |-- sub: string (nullable = true)
|    |-- timestamp: long (nullable = true)
|    |-- vel: struct (nullable = true)
|    |    |-- ground: long (nullable = true)

最新更新