一次评估多个过滤器



我创建了下面的rdd,我需要对同一数据集执行一系列过滤器以派生不同的计数器和聚合。

有没有办法在一次传递中应用这些过滤器并计算聚合,避免多次遍历同一数据集的火花?

val res = df.rdd.map(row => {
// ............... Generate data here for each row.......
})
res.persist(StorageLevel.MEMORY_AND_DISK)
val all = res.count()
val stats1 = res.filter(row => row.getInt(1) > 0)
val stats1Count = stats1.count()
val stats1Agg = stats1.map(r => r.getInt(1)).mean()
val stats2 = res.filter(row => row.getInt(2) > 0)
val stats2Count = stats2.count()
val stats2Agg = stats2.map(r => r.getInt(2)).mean()

您可以使用聚合:

case class Stats(count: Int = 0, sum: Int = 0) {
def mean = sum/count
def +(s: Stats): Stats = Stats(count + s.count, sum + s.sum)
def <- (n: Int) = if(n > 0) copy(count + 1, sum + n) else this 
}
val (stats1, stats2) = res.aggregate(Stats() -> Stats()) (
{ (s, row) => (s._1 <- row.getInt(1), s._2 <- row.getInt(2)) },
{ _ + _ }
)
val (stat1Count, stats1Agg, stats2Count, stats2Agg) = (stats1.count, stats1.mean, stats2.count, stats2.mean)

最新更新