Pyspark聚合使用带有countDistinct函数的dictionary



我正在尝试在数据帧上运行聚合。然后我想计算每列上不同的值。我生成了一个字典,用于聚合,比如:

from pyspark.sql.functions import countDistinct
expr = {x: "countDistinct" for x in df.columns if x is not 'id'}
df.groupBy("id").agg(expr).show()

我得到错误:

AnalysisException:未定义的函数:"countdistinct"。此函数既不是已注册的临时函数,也不是在数据库"default"中注册的永久函数。;

如果我直接使用"countDistinct",它会起作用:

df.groupBy("id").agg(countDistinct('hours'))

Out[1]:DataFrame[id:int,count(hours(:bigint]

这不起作用:

df.groupBy("id").agg({'hours':'countDistinct'}).show()

AnalysisException:未定义的函数:"countdistinct"。此函数既不是已注册的临时函数,也不是在数据库"default"中注册的永久函数。;

对此有什么想法吗?

似乎countDistinct不是"内置聚合函数"。

将不同计数的列直接传递给agg将解决此问题:

cols = [countDistinct(x) for x in df.columns if x != 'id']
df.groupBy('id').agg(*cols).show()

如果我们做一些类似的事情,会更灵活

from pyspark.sql.functions import countDistinct, sum
agg_expn = {'hours':countDistinct, 'somethingelse': sum}
temp = [agg_expn[col] for col in df.columns if agg_expn.get(col, None)]

df.groupby(['id']).agg(*temp)

这允许不同列的不同聚合函数

最新更新