Spark:如何用Struct数组的列表解析多个json



我试图获得一个文件中所有JSON对象的评级的平均值。我加载了文件并转换为数据帧,但在解析avg时出现错误。示例请求:

{
        "country": "France",
        "customerId": "France001",
        "visited": [
            {
                "placeName": "US",
                "rating": "2.3",
                "famousRest": "N/A",
                "placeId": "AVBS34"
            },
              {
                "placeName": "US",
                "rating": "3.3",
                "famousRest": "SeriousPie",
                "placeId": "VBSs34"
            },
              {
                "placeName": "Canada",
                "rating": "4.3",
                "famousRest": "TimHortons",
                "placeId": "AVBv4d"
            }        
    ]
}

所以对于这个JSON,美国平均评级将是(2.3 + 3.3)/2 = 2.8

{
        "country": "Egypt",
        "customerId": "Egypt009",
        "visited": [
            {
                "placeName": "US",
                "rating": "1.3",
                "famousRest": "McDonald",
                "placeId": "Dedcf3"
            },
              {
                "placeName": "US",
                "rating": "3.3",
                "famousRest": "EagleNest",
                "placeId": "CDfet3"
            },

}
{
        "country": "Canada",
        "customerId": "Canada012",
        "visited": [
            {
                "placeName": "UK",
                "rating": "3.3",
                "famousRest": "N/A",
                "placeId": "XSdce2"
            },

    ]
}

= (3.3 +1.3)/2 = 2.3

所以总的来说,平均评分将是:(2.8 + 2.3)/2 = 2.55(只有两个请求在他们的访问列表中有"US")

My schema:

root
|-- country: string(nullable=true)
|-- customerId:string(nullable=true)
|-- visited: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |   |-- placeId: string (nullable = true)
|    |   |-- placeName: string (nullable = true) 
|    |   |-- famousRest: string (nullable = true)
|    |   |-- rating: string (nullable = true)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.jsonFile("temp.txt")
df.show() 

所以基本上我需要得到的平均评级,其中placeName = 'US'在说,例如。AVG_RATING =每个JSON对象的评级之和,其中placeName为US/访问条目的计数,FINAL_VALUE =每个JSON对象中placeName为US的所有AVG_RATING之和/placeName为US的所有JSON对象的计数。

So far I try:

 df.registerTempTable("people")
   sqlContext.sql("select avg(expResults.rank) from people LATERAL VIEW explode(visited)people AS expResults where expResults.placeName = 'US' ").collect().foreach(println)
    val result = df.select("*").where(array_contains (df("visited.placeName"), "US"));  - gives the list where visited array contains US. But I am not sure how do parse through list of structs.
谁能告诉我该怎么做?

看起来你想要这样做:

import org.apache.spark.sql.functions.{avg, explode}
val result = df
  .withColumn("visit", explode($"visited"))    // Explode visits
  .groupBy($"customerId", $"visit.placeName")  // Group by using dot syntax
  .agg(avg($"visit.rating".cast("double")).alias("tmp"))
  .groupBy($"placeName").agg(avg($"tmp").alias("value"))

之后,您可以将其过滤为您选择的国家。

result.where($"placeName" === "US").show
// +---------+-----+
// |placeName|value|
// +---------+-----+
// |       US| 2.55|
// +---------+-----+
不太优雅的方法是使用UDF:
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.udf
def userAverage(country: String) = udf((visits: Seq[Row]) => Try {
   val filtered = visits
     .filter(_.getAs[String]("placeName") == country)
     .map(_.getAs[String]("rating").toDouble)
   filtered.sum / filtered.size
}.toOption)
df.select(userAverage("US")($"visited").as("tmp")).na.drop.agg(avg("tmp"))

:这遵循问题中提供的描述,计算平均值的平均值,这与接受的答案不同。简单平均:

val result = df
  .select(explode($"visited").alias("visit"))
  .groupBy($"visit.placeName")
  .agg(avg($"visit.rating".cast("double")))

按照我的方法来解决你的问题。

val DF = sqlContext.jsonFile("sample.json")

DF.registerTempTable("temp")

sqlContext.sql("select place_and_rating.placeName as placeName, avg(place_and_rating.rating) as avg_rating from temp lateral view explode(visited) exploded_table as place_and_rating where place_and_rating.placeName='US' group by place_and_rating.placeName").show()

相关内容

  • 没有找到相关文章

最新更新