触发多个动态聚合函数,计数不工作



具有多个动态聚合操作的 Spark 数据帧上的聚合。

我想使用 Scala 对 Spark 数据帧进行聚合,并执行多个动态聚合操作(由用户在 JSON 中传递)。我正在将 JSON 转换为Map.

下面是一些示例数据:

colA    colB    colC    colD
1       2       3       4
5       6       7       8
9       10      11      12

我正在使用的 Spark 聚合代码:

var cols = ["colA","colB"]
var aggFuncMap = Map("colC"-> "sum", "colD"-> "countDistinct")
var aggregatedDF = currentDF.groupBy(cols.head, cols.tail: _*).agg(aggFuncMap)

我必须仅aggFuncMapMap传递,以便用户可以通过 JSON 配置传递任意数量的聚合。

上面的代码对于一些聚合工作正常,包括summinmaxavgcount

但是,不幸的是,此代码不适用于countDistinct(也许是因为它是骆驼大小写?

运行上述代码时,我收到此错误:

线程"main"中的异常 org.apache.spark.sql.AnalysisException: Undefined 函数: 'countdistinct'.此函数既不是已注册的临时函数,也不是在数据库"默认"中注册的永久函数

任何帮助将不胜感激!

目前无法在Map内将aggcountDistinct一起使用。从文档中我们看到:

可用的聚合方法包括平均值、最大值、最小值、总和、计数。


一种可能的解决方法是将Map更改为Seq[Column]

val cols = Seq("colA", "colB")
val aggFuncs = Seq(sum("colC"), countDistinct("colD"))
val df2 = df.groupBy(cols.head, cols.tail: _*).agg(aggFuncs.head, aggFuncs.tail: _*)

但是,如果用户要在配置文件中指定聚合,这将没有多大帮助。

另一种方法是使用expr,此函数将计算一个字符串并返回一列。但是,expr不接受"countDistinct",而是需要使用"count(distinct(...))"。 这可以编码如下:

val aggFuncs = Seq("sum(colC)", "count(distinct(colD))").map(e => expr(e))
val df2 = df.groupBy(cols.head, cols.tail: _*).agg(aggFuncs.head, aggFuncs.tail: _*)

相关内容

  • 没有找到相关文章

最新更新