Spark Streaming Scala将不同结构的json组合在一起,形成一个DataFrame



我正在尝试处理来自 Kinesis 的 Json 字符串。Json 字符串可以有几种不同的形式。从 Kinesis 中,我创建了一个 DStream:

val kinesisStream = KinesisUtils.createStream(
ssc, appName, "Kinesis_Stream", "kinesis.ap-southeast-1.amazonaws.com",
"region", InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)
val lines = kinesisStream.map(x => new String(x))
lines.foreachRDD((rdd, time) =>{
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits.StringToColumn
if(rdd.count() > 0){
// Process jsons here
// Json strings here would have either one of the formats below
}
})

RDD 字符串将具有这些 json 字符串之一。 收集:

[
{
"data": {
"ApplicationVersion": "1.0.3 (65)",
"ProjectId": 30024,
"TargetId": "4138",
"Timestamp": 0
},
"host": "host1"
},
{
"data": {
"ApplicationVersion": "1.0.3 (65)",
"ProjectId": 30025,
"TargetId": "4139",
"Timestamp": 0
},
"host": "host1"
}
]

一些 Json 字符串是单个对象,如下所示:

{
"ApplicationVersion": "1.0.3 (65)",
"ProjectId": 30026,
"TargetId": "4140",
"Timestamp": 0
}

我希望能够从"data"键中提取对象,如果它是第一种类型的 Json 字符串并与第二种类型的 Json 组合并形成 RDD/数据帧,我该如何实现?

最终,我希望我的数据框是这样的:

+------------------+---------+--------+---------+
|ApplicationVersion|ProjectId|TargetId|Timestamp|
+------------------+---------+--------+---------+
|        1.0.3 (65)|    30024|    4138|        0|
|        1.0.3 (65)|    30025|    4139|        0|
|        1.0.3 (65)|    30026|    4140|        0|
+------------------+---------+--------+---------+

抱歉,Scala 和 Spark 的新手。我一直在查看现有示例,但不幸的是没有找到解决方案。

提前非常感谢。

此示例使用json4s

import org.json4s._
import org.json4s.jackson.JsonMethods._
implicit val format = DefaultFormats
case class jsonschema ( ApplicationVersion: String, ProjectId: String, TargetId: String, Timestamp:Int )
val string1 = """
[ {
"data" : {
"ApplicationVersion" : "1.0.3 (65)",
"ProjectId" : 30024,
"TargetId" : "4138",
"Timestamp" : 0
},
"host" : "host1"
}, {
"data" : {
"ApplicationVersion" : "1.0.3 (65)",
"ProjectId" : 30025,
"TargetId" : "4139",
"Timestamp" : 0
},
"host" : "host1"
} ]
"""
val string2 = """
[ {
"ApplicationVersion" : "1.0.3 (65)",
"ProjectId" : 30025,
"TargetId" : "4140",
"Timestamp" : 0
}, {
"ApplicationVersion" : "1.0.3 (65)",
"ProjectId" : 30025,
"TargetId" : "4141",
"Timestamp" : 0
} ]
"""
val json1 = (parse(string1)  "data").extract[List[jsonschema]]
val json2 = parse(string2).extract[List[jsonschema]]
val jsonRDD = json1.union(json2)
val df = sqlContext.createDataFrame(jsonRDD)
df.show

+------------------+---------+--------+---------+
|ApplicationVersion|ProjectId|TargetId|Timestamp|
+------------------+---------+--------+---------+
|        1.0.3 (65)|    30024|    4138|        0|
|        1.0.3 (65)|    30025|    4139|        0|
|        1.0.3 (65)|    30025|    4140|        0|
|        1.0.3 (65)|    30025|    4141|        0|
+------------------+---------+--------+---------+

从第一个Dataframe中选择data.*列后,可以使用联合:

val spark = SparkSession.builder().master("local[*]").getOrCreate()    
val sc = spark.sparkContext
// Assuming you store your jsons in two separate strings `json1` and `json2`
val df1 = spark.read.json(sc.parallelize(Seq(json1)))
val df2 = spark.read.json(sc.parallelize(Seq(json2)))
import spark.implicits._
df1.select($"data.*") // Select only the data columns from first Dataframe
.union(df2)         // Union the two Dataframes as they have the same structure
.show()

编辑 [其他解决方案链接]

在您编辑问题后,我知道您在解析 JSON 文件时需要某种回退机制。使用任何 JSON 解析库还有更多方法可以做到这一点 - Play 有一个很好的解决方案,我认为它已经解释了如何以优雅的方式解决这个问题。

一旦您有一个数据是"变体"类型的RDD[Data],您就可以简单地使用rdd.toDF()将其转换为Dataframe

希望有帮助。

最新更新