我有一个文件,里面有很多列其中一列jsonstring
是字符串类型里面有json字符串。我们设格式如下
{
"key1": "value1",
"key2": {
"level2key1": "level2value1",
"level2key2": "level2value2"
}
}
我想这样解析这个列:jsonstring.key1,jsonstring.key2。Level2key1返回value1, level2value1
如何在scala或spark sql中做到这一点
在Spark 2.2中,你可以使用from_json函数为你做JSON解析。
from_json(e: Column, schema: String, options: Map[String, String]): Column将包含JSON字符串的列解析为具有指定模式的
StructType
或ArrayType
的StructTypes
。
通过使用*
(*)来平展嵌套列,这似乎是最好的解决方案。
// the input dataset (just a single JSON blob)
val jsonstrings = Seq("""{
"key1": "value1",
"key2": {
"level2key1": "level2value1",
"level2key2": "level2value2"
}
}""").toDF("jsonstring")
// define the schema of JSON messages
import org.apache.spark.sql.types._
val key2schema = new StructType()
.add($"level2key1".string)
.add($"level2key2".string)
val schema = new StructType()
.add($"key1".string)
.add("key2", key2schema)
scala> schema.printTreeString
root
|-- key1: string (nullable = true)
|-- key2: struct (nullable = true)
| |-- level2key1: string (nullable = true)
| |-- level2key2: string (nullable = true)
val messages = jsonstrings
.select(from_json($"jsonstring", schema) as "json")
.select("json.*") // <-- flattening nested fields
scala> messages.show(truncate = false)
+------+---------------------------+
|key1 |key2 |
+------+---------------------------+
|value1|[level2value1,level2value2]|
+------+---------------------------+
scala> messages.select("key1", "key2.*").show(truncate = false)
+------+------------+------------+
|key1 |level2key1 |level2key2 |
+------+------------+------------+
|value1|level2value1|level2value2|
+------+------------+------------+
可以使用withColumn + udf + json4s:
import org.json4s.{DefaultFormats, MappingException}
import org.json4s.jackson.JsonMethods._
import org.apache.spark.sql.functions._
def getJsonContent(jsonstring: String): (String, String) = {
implicit val formats = DefaultFormats
val parsedJson = parse(jsonstring)
val value1 = (parsedJson "key1").extract[String]
val level2value1 = (parsedJson "key2" "level2key1").extract[String]
(value1, level2value1)
}
val getJsonContentUDF = udf((jsonstring: String) => getJsonContent(jsonstring))
df.withColumn("parsedJson", getJsonContentUDF(df("jsonstring")))