Spark dataframe aggregation scala


val df = sc.parallelize(Seq((a, 1), (a, null), (b, null)(b, 2),(b, 3),(c, 2),(c, 4),(c, 3))).toDF("col1","col2")

输出应如下所示。

col1 col2
a    null
b    null
c    4

我知道 groupBy on col1 并得到 col2 的最大值。 我可以使用df.groupBy("col1").agg("col2"->"max")执行

但我的要求是,如果 null 在那里,我想选择该记录,但如果 null 不存在,我想选择 col2 的最大值。

我怎么能做到这一点,任何人都可以帮我。

正如我评论的那样,您对null的使用会使事情产生不必要的问题,因此,如果您首先不能在没有null的情况下工作,我认为将其变成更有用的东西是最有意义的:

val df = sparkContext.parallelize(Seq((a, 1), (a, null), (b, null), (b, 2),(b, 3),(c, 2),(c, 4),(c, 3)))
.mapValues { v => Option(v) match {
case Some(i: Int) => i
case _ => Int.MaxValue
}
}.groupBy(_._1).map {
case (k, v) => k -> v.map(_._2).max
}

首先,我使用Option来摆脱null并将树下的东西从Any移动到Int,这样我就可以享受更多的类型安全性。我用MaxValue替换null,原因我稍后会解释。

然后我像你一样groupBy,但随后我map组,将键与最大值配对,这将是您的原始数据项之一,MaxValuenull曾经的位置。如果必须,你可以把它们变回null,但我不会。

可能有一种更简单的方法可以完成所有这些工作,但我喜欢用MaxValue替换null,模式匹配可以帮助我缩小类型范围,以及我可以在之后对所有内容都一视同仁的事实。

相关内容

  • 没有找到相关文章