spark-aggregatebykey-sum和同一调用中的running average



我正在学习spark,没有hadoop的经验。

问题

我正试图在对aggregateByKey的同一次调用中计算总和和平均值。

让我分享一下我迄今为止所做的尝试。

设置数据

val categoryPrices = List((1, 20), (1, 25), (1, 10), (1, 45))
val categoryPricesRdd = sc.parallelize(categoryPrices)

尝试计算对aggregateByKey的同一调用中的平均值。这不起作用

val zeroValue1 = (0, 0, 0.0) // (count, sum, average)
categoryPricesRdd.
aggregateByKey(zeroValue1)(
(tuple, prevPrice) => {
val newCount = tuple._1 + 1
val newSum = tuple._2 + prevPrice
val newAverage = newSum/newCount
(newCount, newSum, newAverage)
},
(tuple1, tuple2) => {
val newCount1 = tuple1._1 + tuple2._1
val newSum1 = tuple1._2 + tuple2._2
// TRYING TO CALCULATE THE RUNNING AVERAGE HERE
val newAverage1 = ((tuple1._2 * tuple1._1) + (tuple2._2 * tuple2._1))/(tuple1._1 + tuple2._1)
(newCount1, newSum1, newAverage1)
}
).
collect.
foreach(println)

结果:每次打印不同的平均值

  • 第一次:(1,(4100,70.0((
  • 第二次:(1,(4100,52.0((

只需先求和,然后在单独的运算中计算平均值。这是有效的

val zeroValue2 = (0, 0) // (count, sum, average)
categoryPricesRdd.
aggregateByKey(zeroValue2)(
(tuple, prevPrice) => {
val newCount = tuple._1 + 1
val newSum = tuple._2 + prevPrice
(newCount, newSum)
},
(tuple1, tuple2) => {
val newCount1 = tuple1._1 + tuple2._1
val newSum1 = tuple1._2 + tuple2._2
(newCount1, newSum1)
}
).
map(rec => {
val category = rec._1
val count = rec._2._1
val sum = rec._2._2
(category, count, sum, sum/count)
}).
collect.
foreach(println)

每次打印相同的结果:(1,4100,25(

我想我理解seqOp和CombOp之间的区别。考虑到一个操作可以在不同服务器上的多个分区中分割数据,我的理解是seqOp对单个分区中的数据进行操作,然后combOp组合从不同分区接收的数据。如果这是错误的,请纠正。

然而,有一些非常基本的东西我不理解。看来我们无法同时计算同一呼叫中的总和和平均值。如果这是真的,请帮我理解原因。

seqOp:中与average聚合相关的计算

val newAverage = newSum/newCount

combOp:中

val newAverage1 = ((tuple1._2 * tuple1._1) + (tuple2._2 * tuple2._1)) / (tuple1._1 + tuple2._1)

不正确。

假设前三个元素在一个分区中,最后一个元素在另一个分区。您的seqOp将生成(计数、总和、平均值(元组,如下所示:

Partition #1: [20, 25, 10]
--> (1, 20, 20/1)
--> (2, 45, 45/2)
--> (3, 55, 55/3)
Partition #2: [45]
--> (1, 45, 45/1)

接下来,跨分区combOp将把来自两个分区的2个元组组合起来,得到:

((55 * 3) + (45 * 1)) / 4
// Result: 52

从上面的步骤中可以看出,如果RDD元素或分区的顺序不同,则average值可能会不同。

您的第二种方法是有效的,因为average根据定义是总和超过总数,因此最好在首先计算总和和计数值后进行计算。

最新更新