如何从json字符串提取值



我有一个文件,里面有很多列其中一列jsonstring是字符串类型里面有json字符串。我们设格式如下

{
    "key1": "value1",
    "key2": {
        "level2key1": "level2value1",
        "level2key2": "level2value2"
    }
}

我想这样解析这个列:jsonstring.key1,jsonstring.key2。Level2key1返回value1, level2value1

如何在scala或spark sql中做到这一点

在Spark 2.2中,你可以使用from_json函数为你做JSON解析。

from_json(e: Column, schema: String, options: Map[String, String]): Column将包含JSON字符串的列解析为具有指定模式的StructTypeArrayTypeStructTypes

通过使用*(*)来平展嵌套列,这似乎是最好的解决方案。

// the input dataset (just a single JSON blob)
val jsonstrings = Seq("""{
    "key1": "value1",
    "key2": {
        "level2key1": "level2value1",
        "level2key2": "level2value2"
    }
}""").toDF("jsonstring")
// define the schema of JSON messages
import org.apache.spark.sql.types._
val key2schema = new StructType()
  .add($"level2key1".string)
  .add($"level2key2".string)
val schema = new StructType()
  .add($"key1".string)
  .add("key2", key2schema)
scala> schema.printTreeString
root
 |-- key1: string (nullable = true)
 |-- key2: struct (nullable = true)
 |    |-- level2key1: string (nullable = true)
 |    |-- level2key2: string (nullable = true)
val messages = jsonstrings
  .select(from_json($"jsonstring", schema) as "json")
  .select("json.*") // <-- flattening nested fields
scala> messages.show(truncate = false)
+------+---------------------------+
|key1  |key2                       |
+------+---------------------------+
|value1|[level2value1,level2value2]|
+------+---------------------------+
scala> messages.select("key1", "key2.*").show(truncate = false)
+------+------------+------------+
|key1  |level2key1  |level2key2  |
+------+------------+------------+
|value1|level2value1|level2value2|
+------+------------+------------+

可以使用withColumn + udf + json4s:

import org.json4s.{DefaultFormats, MappingException}
import org.json4s.jackson.JsonMethods._
import org.apache.spark.sql.functions._
def getJsonContent(jsonstring: String): (String, String) = {
    implicit val formats = DefaultFormats
    val parsedJson = parse(jsonstring)  
    val value1 = (parsedJson  "key1").extract[String]
    val level2value1 = (parsedJson  "key2"  "level2key1").extract[String]
    (value1, level2value1)
}
val getJsonContentUDF = udf((jsonstring: String) => getJsonContent(jsonstring))
df.withColumn("parsedJson", getJsonContentUDF(df("jsonstring")))

相关内容

  • 没有找到相关文章

最新更新