我有一个数据帧,我需要在同一数据帧上有两个不同的组by。
+----+-------+--------+----------------------------+
| id | type | item | value | timestamp |
+----+-------+--------+----------------------------+
| 1 | rent | dvd | 12 |2016-09-19T00:00:00Z
| 1 | rent | dvd | 12 |2016-09-19T00:00:00Z
| 1 | buy | tv | 12 |2016-09-20T00:00:00Z
| 1 | rent | movie | 12 |2016-09-20T00:00:00Z
| 1 | buy | movie | 12 |2016-09-18T00:00:00Z
| 1 | buy | movie | 12 |2016-09-18T00:00:00Z
+----+-------+-------+------------------------------+
我想得到的结果是:
id : 1
totalValue : 72 --- group by based on id
typeCount : {"rent" : 3, "buy" : 3} --- group by based on id
itemCount : {"dvd" : 2, "tv" : 1, "movie" : 3 } --- group by based on id
typeForDay : {"rent: 2, "buy" : 2 } --- group By based on id and dayofmonth(col("timestamp")) atmost 1 type per day
I tried:
val count_by_value = udf {( listValues :scala.collection.mutable.WrappedArray[String]) => if (listValues == null) null else listValues.groupBy(identity).mapValues(_.size)}
val group1 = df.groupBy("id").agg(collect_list("type"),sum("value") as "totalValue", collect_list("item"))
val group1Result = group1.withColumn("typeCount", count_by_value($"collect_list(type)"))
.drop("collect_list(type)")
.withColumn("itemCount", count_by_value($"collect_list(item)"))
.drop("collect_list(item)")
val group2 = df.groupBy("id", dayofmonth(col("timestamp"))).agg(collect_set("type"))
val group2Result = group2.withColumn("typeForDay", count_by_value($"collect_set(type)"))
.drop("collect_set(type)")
val groupedResult = group1Result.join(group2Result, "id").show()
但是这需要时间,有没有其他有效的方法呢?
更好的方法是将每个组字段添加到键&减少它们而不是groupBy()。你可以使用这些:
df1.map(rec => (rec(0), rec(3).toString().toInt)).
reduceByKey(_+_).take(5).foreach(println)
=> (72)
df1.map(rec => ((rec(0), rec(1)), 1)).
map(x => (x._1._1, x._1._2,x._2)).
reduceByKey(_+_).take(5).foreach(println)
=>(1、租金、3)
(1买3)
df1.map(rec => ((rec(0), rec(2)), 1)).
map(x => (x._1._1, x._1._2,x._2)).
reduceByKey(_+_).take(5).foreach(println)
=>(1、dvd、2)
(1、电视、1)
(1、电影,3)
df1.map(rec => ((rec(0), rec(1), rec(4).toString().substring(8,10)), 1)).
reduceByKey(_+_).map(x => (x._1._1, x._1._2,x._1._3,x._2)).
take(5).foreach(println)
=>(2) 1、租金、19日
(1) 1、购买20日
(2) 1、购买18日
(1) 1、租金、20日