我有如下数据,
n1 d1 un1 mt1 1
n1 d1 un1 mt2 2
n1 d1 un1 mt3 3
n1 d1 un1 mt4 4
n1 d2 un1 mt1 3
n1 d2 un1 mt3 3
n1 d2 un1 mt4 4
n1 d2 un1 mt5 6
n1 d2 un1 mt2 3
我想得到如下输出
n1 d1 un1 0.75
n1 d2 un1 1.5
i,我们对第1、2、3列做一个groupby,对第4列做一个groupby,第4列= within group, (mt1+mt2)/mt4
我正试图用Spark DF做同样的事情假设数据位于数据框架a中,列名为n、d、un、mt、r我正在尝试这个。
sqlContext.udf.register("aggUDF",(v:List(mt,r))=> ?)
val b = a.groupBy("n","d","un").agg(callUdf("aggUDF",List((mt,r)) should go here))
如果我理解正确的话,您首先要计算mt1和mt2的行之和,然后除以mt4中每个不同的n1,d1, un1的行之和。
虽然可以像上面回答的那样使用自定义聚合函数,但您也可以使用一点蛮力(我将在pyspark中展示它,但您应该能够轻松转换为scala)。
假设您的原始数据框架名为df,列按顺序排列:n,d,un,mt,r
首先为mt1、mt2和mt4分别创建一个新列,如下所示:
from pyspark.sql import functions as F
newdf = df.withColumn("mt1", when(df.mt == "mt1", df.r).otherwise(0).alias("mt1"))
newdf = newdf .withColumn("mt2", when(newdf.mt == "mt2", newdf .r).otherwise(0).alias("mt2"))
newdf = newdf .withColumn("mt4", when(newdf.mt == "mt4", newdf .r).otherwise(0).alias("mt4"))
现在对前3个值进行分组,并对新3个值进行汇总。
aggregated = newdf.groupBy(["n","d","n"]).agg(F.sum(newdf.mt1).alias("sum_mt1"),F.sum(newdf.mt2).alias("sum_mt2"), F.sum(newdf.mt4).alias("sum_mt4"))
现在只做计算:
final = aggregated.withColumn("res", (aggregated.sum_mt1 + aggregated.sum_mt2) / aggregated.sum_mt4)
不是最优雅的解决方案,但它可能适合你…
目前(Spark 1.4)还不支持自定义聚合函数。但是,您可以使用Hive udaf。您可以在Spark中看到一个Hive用户定义聚合函数(UDAF)的示例。