Spark分组和自定义聚合



我有如下数据,

n1  d1  un1 mt1 1
n1  d1  un1 mt2 2
n1  d1  un1 mt3 3
n1  d1  un1 mt4 4
n1  d2  un1 mt1 3
n1  d2  un1 mt3 3
n1  d2  un1 mt4 4
n1  d2  un1 mt5 6
n1  d2  un1 mt2 3

我想得到如下输出

n1 d1 un1 0.75
n1 d2 un1 1.5

i,我们对第1、2、3列做一个groupby,对第4列做一个groupby,第4列= within group, (mt1+mt2)/mt4

我正试图用Spark DF做同样的事情假设数据位于数据框架a中,列名为n、d、un、mt、r我正在尝试这个。

sqlContext.udf.register("aggUDF",(v:List(mt,r))=> ?)
val b = a.groupBy("n","d","un").agg(callUdf("aggUDF",List((mt,r)) should go here))

如果我理解正确的话,您首先要计算mt1和mt2的行之和,然后除以mt4中每个不同的n1,d1, un1的行之和。

虽然可以像上面回答的那样使用自定义聚合函数,但您也可以使用一点蛮力(我将在pyspark中展示它,但您应该能够轻松转换为scala)。

假设您的原始数据框架名为df,列按顺序排列:n,d,un,mt,r

首先为mt1、mt2和mt4分别创建一个新列,如下所示:

from pyspark.sql import functions as F
newdf = df.withColumn("mt1", when(df.mt == "mt1", df.r).otherwise(0).alias("mt1"))
newdf = newdf .withColumn("mt2", when(newdf.mt == "mt2", newdf .r).otherwise(0).alias("mt2"))
newdf = newdf .withColumn("mt4", when(newdf.mt == "mt4", newdf .r).otherwise(0).alias("mt4"))

现在对前3个值进行分组,并对新3个值进行汇总。

aggregated = newdf.groupBy(["n","d","n"]).agg(F.sum(newdf.mt1).alias("sum_mt1"),F.sum(newdf.mt2).alias("sum_mt2"), F.sum(newdf.mt4).alias("sum_mt4"))

现在只做计算:

final = aggregated.withColumn("res", (aggregated.sum_mt1 +  aggregated.sum_mt2) / aggregated.sum_mt4)    

不是最优雅的解决方案,但它可能适合你…

目前(Spark 1.4)还不支持自定义聚合函数。但是,您可以使用Hive udaf。您可以在Spark中看到一个Hive用户定义聚合函数(UDAF)的示例。

相关内容

  • 没有找到相关文章

最新更新