我试图在一个文件中获得所有json对象的评级的平均值。我加载了文件并转换为数据帧,但在解析avg时出现错误。示例请求:
{
"country": "France",
"customerId": "France001",
"visited": [
{
"placeName": "US",
"rating": "2.3",
"famousRest": "N/A",
"placeId": "AVBS34"
},
{
"placeName": "US",
"rating": "3.3",
"famousRest": "SeriousPie",
"placeId": "VBSs34"
},
{
"placeName": "Canada",
"rating": "4.3",
"famousRest": "TimHortons",
"placeId": "AVBv4d"
}
]
}
所以对于这个json,美国的平均评级将是(2.3 + 3.3)/2 = 2.8
{
"country": "Egypt",
"customerId": "Egypt009",
"visited": [
{
"placeName": "US",
"rating": "1.3",
"famousRest": "McDonald",
"placeId": "Dedcf3"
},
{
"placeName": "US",
"rating": "3.3",
"famousRest": "EagleNest",
"placeId": "CDfet3"
},
}
{
"country": "Canada",
"customerId": "Canada012",
"visited": [
{
"placeName": "UK",
"rating": "3.3",
"famousRest": "N/A",
"placeId": "XSdce2"
},
]
}
= (3.3 +1.3)/2 = 2.3
所以总的来说,平均评分将是:(2.8 + 2.3)/2 = 2.55(只有两个请求在他们的访问列表中有"US")
My schema:
root
|-- country: string(nullable=true)
|-- customerId:string(nullable=true)
|-- visited: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- placeId: string (nullable = true)
| | |-- placeName: string (nullable = true)
| | |-- famousRest: string (nullable = true)
| | |-- rating: string (nullable = true)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.jsonFile("temp.txt")
df.show()
当做:
val app = df.select("strategies"); app.registerTempTable("app"); app.printSchema(); app.show()
app.foreach({
t => t.select("placeName", "rating").where(t("placeName") == "US")
}).show()
I am getting :
<console>:31: error: value select is not a member of org.apache.spark.sql.Row t => t.select("placeName", "rating").where(t("placeName") == "US") ^
谁能告诉我我哪里做错了?
假设app
是Dataframe
(你的代码示例是不可理解的…你创建一个df
变量并查询一个app
变量),你不应该调用foreach
来从中选择:
app.select("placeName", "rating").where(t("placeName") == "US")
foreach
将对每条记录(Row
类型)调用一个函数。这主要用于调用一些副作用(例如打印到控制台/发送到外部服务等)。大多数情况下,您不会使用它来选择/转换数据框架。
:
对于最初如何计算仅在美国访问的平均值的问题:
// explode to make a record out of each "visited" Array item,
// taking only "placeName" and "rating" columns
val exploded: DataFrame = df.explode(df("visited")) {
case Row(visits: Seq[Row]) =>
visits.map(r => (r.getAs[String]("placeName"), r.getAs[String]("rating")))
}
// make some order: rename columns named _1, _2 (since we used a tuple),
// and cast ratings to Double:
val ratings: DataFrame = exploded
.withColumnRenamed("_1", "placeName")
.withColumn("rating", exploded("_2").cast(DoubleType))
.select("placeName", "rating")
ratings.printSchema()
ratings.show()
/* prints:
root
|-- placeName: string (nullable = true)
|-- rating: double (nullable = true)
+---------+------+
|placeName|rating|
+---------+------+
| US| 1.3|
| US| 3.3|
| UK| 3.3|
+---------+------+
*/
// now filter US only and get average rating:
val avg = ratings
.filter(ratings("placeName") === "US")
.select(mean("rating"))
avg.show()
/* prints:
+-----------+
|avg(rating)|
+-----------+
| 2.3|
+-----------+
*/