aggregateByKey 不会更新初始集的值



hllTotal中的值会更新,但每个键hllToday值都保持为零。
任何人都可以帮忙,为什么hllToday在这里没有更新?

val hllToday: HllSerializable = new HllSerializable(new HLL(13, 5))
val hllTotal: HllSerializable = new HllSerializable(new HLL(13, 5))
val initialSet = (hllToday, hllTotal)
val rdd7 = combinedRdd.map(tuple => {
tuple.segmentId -> (tuple.entryTime, tuple.exitTime, tuple.hll)`
}).aggregateByKey(initialSet)(
(acc: (HllSerializable, HllSerializable), v: (Long, Long, HllSerializable)) => {
if (v._1 == today_ts) {
acc._1.getHll.union(v._3.getHll)
}
acc._2.getHll.union(v._3.getHll)
acc
}, (acc1: (HllSerializable, HllSerializable), acc2: (HllSerializable, HllSerializable)) =>    {
acc1._1.getHll.union(acc2._1.getHll)
acc1._2.getHll.union(acc2._2.getHll)
acc1
}
)

显然这段代码从未被调用:

if (v._1 == today_ts) {
acc._1.getHll.union(v._3.getHll)
}

尝试将if (v._1 == today_ts)替换为if(true)以查看hllToday是否更新

最新更新