如何查询 Spark SQL 中 StringType 的 1 个字段具有 json 值的数据帧



我正在尝试在火花数据帧上使用SQL。但是数据框有 1 个值有字符串(这是类似 JSON 的结构):

我将数据框保存到临时表:测试表

当我做描述时:

col_name                       data_type
requestId                       string  
name                            string  
features                        string  

但是功能值是一个 json:

{"places":11,"movies":2,"totalPlacesVisited":0,"totalSpent":13,"SpentMap":{"Movie":2,"Park Visit":11},"benefits":{"freeTime":13}}

我只想在测试表上查询总花费> 10。有人可以告诉我该怎么做吗?

我的 JSON 文件看起来像:

   {
        "requestId": 232323,
        "name": "ravi",
        "features": "{"places":11,"movies":2,"totalPlacesVisited":0,"totalSpent":13,"SpentMap":{"Movie":2,"Park Visit":11},"benefits":{"freeTime":13}}"
    }

功能是一个字符串。我只需要总花费在那里。 我尝试了:

val features = StructType( 
Array(StructField("totalSpent",LongType,true), 
StructField("movies",LongType,true) 
))
val schema = StructType(Array( 
StructField("requestId",StringType,true), 
StructField("name",StringType,true), 
StructField("features",features,true), 
) 
)
val records = sqlContext.read.schema(schema).json(filePath)

由于每个请求都有一个 JSON 功能字符串。但这给了我错误。

当我尝试使用

val records = sqlContext.jsonFile(filePath)
records.printSchema

告诉我 :

root
 |-- requestId: string (nullable = true)
 |-- features: string (nullable = true)
 |-- name: string (nullable = true)

我可以在创建架构时在结构字段中使用并行化吗?我尝试了:

I first tried with : 
val customer = StructField("features",StringType,true)
val events = sc.parallelize(customer :: Nil)

 val schema = StructType(Array( 
    StructField("requestId",StringType,true), 
    StructField("name", StructType(events, true),true), 
    StructField("features",features,true), 
    ) 
    )

这也给了我错误。还尝试过:

import net.liftweb.json.parse
case class KV(k: String, v: Int)
val parseJson = udf((s: String) => {
  implicit val formats = net.liftweb.json.DefaultFormats
  parse(s).extract[KV]
})
val parsed = records.withColumn("parsedJSON", parseJson($"features"))
parsed.show
This gives me : 
<console>:78: error: object liftweb is not a member of package net
       import net.liftweb.json.parse

试:

我尝试了:

 val parseJson = udf((s: String) => {
  sqlContext.read.json(s)
})
val parsed = records.withColumn("parsedJSON", parseJson($"features"))
parsed.show

但又是错误。

试:

import org.json4s._ 
import org.json4s.jackson.JsonMethods._ 
val parseJson = udf((s: String) => { 
parse(s) 
}) 
val parsed = records.withColumn("parsedJSON", parseJson($"features")) 
parsed.show

但它给了我:

java.lang.UnsupportedOperationException: Schema for type org.json4s.JValue is not supported
    at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:153)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:64)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29)

这给了我正确的模式(基于 zero323 给出的答案:

val extractFeatures = udf((features: String) => Try {
implicit val formats = DefaultFormats
  parse(features).extract[Features]
}.toOption)
val parsed = records.withColumn("features", extractFeatures($"features"))

parsed.printSchema

但是当我查询时:

val value = parsed.filter($"requestId" === "232323" ).select($"features.totalSpent")

value.show gives null .

当你从UDF返回数据时,它必须表示为SQL类型,而JSON AST不是。一种方法是创建一个类似于下面的案例类:

case class Features(
  places: Integer, 
  movies: Integer,
  totalPlacesVisited: Integer, 
  totalSpent: Integer,
  SpentMap: Map[String, Integer],
  benefits: Map[String, Integer]
) 

并使用它来extract对象:

val df = Seq((
  232323, "ravi",
  """{"places":11,"movies":2,"totalPlacesVisited":0,"totalSpent":13,"SpentMap":{"Movie":2,"Park Visit":11},"benefits":{"freeTime":13}}"""
)).toDF("requestId", "name", "features")
val extractFeatures = udf((features: String) => 
  parse(features).extract[Features])
val parsed = df.withColumn("features", extractFeatures($"features"))
parsed.show(false)
// +---------+----+-----------------------------------------------------------------+
// |requestId|name|features                                                         |
// +---------+----+-----------------------------------------------------------------+
// |232323   |ravi|[11,2,0,13,Map(Movie -> 2, Park Visit -> 11),Map(freeTime -> 13)]|
// +---------+----+-----------------------------------------------------------------+
parsed.printSchema
// root
//  |-- requestId: integer (nullable = false)
//  |-- name: string (nullable = true)
//  |-- features: struct (nullable = true)
//  |    |-- places: integer (nullable = true)
//  |    |-- movies: integer (nullable = true)
//  |    |-- totalPlacesVisited: integer (nullable = true)
//  |    |-- totalSpent: integer (nullable = true)
//  |    |-- SpentMap: map (nullable = true)
//  |    |    |-- key: string
//  |    |    |-- value: integer (valueContainsNull = true)
//  |    |-- benefits: map (nullable = true)
//  |    |    |-- key: string
//  |    |    |-- value: integer (valueContainsNull = true)

根据其他记录和预期使用情况,应调整表示形式并添加相关的错误处理逻辑。

您还可以使用 DSL 以字符串形式访问单个字段:

val getMovieSpent = udf((s: String) => 
  compact(render(parse(s) \ "SpentMap" \ "Movie")))
df.withColumn("movie_spent", getMovieSpent($"features").cast("bigint")).show
// +---------+----+--------------------+-----------+
// |requestId|name|            features|movie_spent|
// +---------+----+--------------------+-----------+
// |   232323|ravi|{"places":11,"mov...|          2|
// +---------+----+--------------------+-----------+

有关替代方法,请参阅如何使用 Spark 数据帧查询 JSON 数据列?

相关内容

  • 没有找到相关文章

最新更新