我正在尝试在数据帧上运行聚合。然后我想计算每列上不同的值。我生成了一个字典,用于聚合,比如:
from pyspark.sql.functions import countDistinct
expr = {x: "countDistinct" for x in df.columns if x is not 'id'}
df.groupBy("id").agg(expr).show()
我得到错误:
AnalysisException:未定义的函数:"countdistinct"。此函数既不是已注册的临时函数,也不是在数据库"default"中注册的永久函数。;
如果我直接使用"countDistinct",它会起作用:
df.groupBy("id").agg(countDistinct('hours'))
Out[1]:DataFrame[id:int,count(hours(:bigint]
这不起作用:
df.groupBy("id").agg({'hours':'countDistinct'}).show()
AnalysisException:未定义的函数:"countdistinct"。此函数既不是已注册的临时函数,也不是在数据库"default"中注册的永久函数。;
对此有什么想法吗?
似乎countDistinct
不是"内置聚合函数"。
将不同计数的列直接传递给agg
将解决此问题:
cols = [countDistinct(x) for x in df.columns if x != 'id']
df.groupBy('id').agg(*cols).show()
如果我们做一些类似的事情,会更灵活
from pyspark.sql.functions import countDistinct, sum
agg_expn = {'hours':countDistinct, 'somethingelse': sum}
temp = [agg_expn[col] for col in df.columns if agg_expn.get(col, None)]
df.groupby(['id']).agg(*temp)
这允许不同列的不同聚合函数