如何减少Spark中的List[Key,List[Name,Value]]



这是我的模型的结构

package object summary {
case class NameValuePair(name: String, value: Long)
case class Result(key: String, pairs: List[NameValuePair])
case class Data(data: List[Result])
}

数据将类似

[
Result("Paris", List[NameValuePair("apples",10),NameValuePair("oranges",20),NameValuePair("peaches",30)]),
Result("Paris", List[NameValuePair("apples",20),NameValuePair("oranges",30),NameValuePair("peaches",40)]),
Result("NY", List[NameValuePair("apples",20),NameValuePair("oranges",30),NameValuePair("peaches",40)]),
Result("NY", List[NameValuePair("apples",40),NameValuePair("oranges",30),NameValuePair("peaches",10)]),
Result("London", List[NameValuePair("apples",20),NameValuePair("oranges",30),NameValuePair("peaches",40)])
]

我想要下面的输出

[
("Paris", [("apples", 30),("oranges", 50),("peaches",70)]),
("NY", [("apples", 60),("oranges", 60),("peaches",50)]),
("London", [("apples", 20),("oranges", 30),("peaches",40)])
]

我想找到基于城市的水果计数的总和。如何用火花做到这一点?

您可以通过使用类似于的spark RDD来实现这一点

我重新创建了您的数据以创建RDD:

val data_test =
List(Result("Paris", List( new NameValuePair("apples",10),new NameValuePair("oranges",20), new NameValuePair("peaches",30))),
Result("Paris", List( new NameValuePair("apples",20), new NameValuePair("oranges",30),new NameValuePair("peaches",40))),
Result("NY", List(new NameValuePair("apples",20),new NameValuePair("oranges",30), new NameValuePair("peaches",40))),
Result("NY", List(new NameValuePair("apples",40), new NameValuePair("oranges",30), new NameValuePair("peaches",10))),
Result("London", List(new NameValuePair("apples",20),new NameValuePair("oranges",30),new NameValuePair("peaches",40))) )

然后,我从data_test创建了RDD,并对其进行了转换,代码如下:

val rdd_data = sc.parallelize(data_test)
val rdd_1 = rdd_data.map(x => ((x.key,x.pairs(0).name),x.pairs(0).value))
val rdd_2 = rdd_data.map(x => ((x.key,x.pairs(1).name),x.pairs(1).value))
val rdd_3 = rdd_data.map(x => ((x.key,x.pairs(2).name),x.pairs(2).value))
val rdd_final = rdd_1.union(rdd_2).union(rdd_3)
val rdd_reduce = rdd_final.reduceByKey((x,y) => x+y)
val rdd_transformed = rdd_reduce.map(x=>(x._1._1,(x._1._2,x._2))).groupByKey().map(x=>(x._1,x._2.toList))
rdd_transformed.foreach(println)

所获得的结果如下:

(NY,List((peaches,50), (apples,60), (oranges,60)))
(London,List((apples,20), (peaches,40), (oranges,30)))
(Paris,List((oranges,50), (peaches,70), (apples,30)))

[评论后编辑]如果对的数量不同,你可以定义这样的函数:

def func(res : Result): List[((String,String),Long)] = {
var r = List[((String,String),Long)]()
var i = List[NameValuePair]()
for(i <- res.pairs){
val tt : ((String,String),Long)= ((res.key,i.name),i.value)
r = tt :: r
}
return r
}

然后你可以直接跳到上面我生成rdd_final的那一行,就像这样:

val rdd_final = rdd_data.flatMap(x=>func(x))

然后以相同的方式执行其他指令。

我会使用数据帧分组函数来完成它

import spark.implicits._
Seq(
Result("Paris", List( new NameValuePair("apples",10),new NameValuePair("oranges",20), new NameValuePair("peaches",30))),
Result("Paris", List( new NameValuePair("apples",20), new NameValuePair("oranges",30),new NameValuePair("peaches",40))),
Result("NY", List(new NameValuePair("apples",20),new NameValuePair("oranges",30), new NameValuePair("peaches",40))),
Result("NY", List(new NameValuePair("apples",40), new NameValuePair("oranges",30), new NameValuePair("peaches",10))),
Result("London", List(new NameValuePair("apples",20),new NameValuePair("oranges",30),new NameValuePair("peaches",40)))
).flatMap(row => {
val city = row.key
val fruits = row.pairs
fruits.map(f => {
val fruitName = f.name
val v = f.value
(city, fruitName, v)
})
}).toDF("city", "fruit", "value")
.groupBy("city").sum().show()
//The result would be:
+------+----------+
|  city|sum(value)|
+------+----------+
|London|        90|
| Paris|       150|
|    NY|       170|
+------+----------+

最新更新