如何在Scala中对同一数据帧执行两个不同的分组条件



我有一个数据帧,我需要在同一数据帧上有两个不同的组by。

+----+-------+--------+----------------------------+
| id | type  | item   | value  | timestamp         |
+----+-------+--------+----------------------------+
| 1 |  rent  |  dvd   |  12    |2016-09-19T00:00:00Z
| 1 |  rent  |  dvd   |  12    |2016-09-19T00:00:00Z
| 1 | buy    |  tv    |  12    |2016-09-20T00:00:00Z
| 1 |  rent  |  movie |  12    |2016-09-20T00:00:00Z
| 1 |   buy  |  movie |  12    |2016-09-18T00:00:00Z
| 1 | buy    |  movie |  12    |2016-09-18T00:00:00Z
+----+-------+-------+------------------------------+ 

我想得到的结果是:

id : 1
totalValue  : 72 --- group by based on id
typeCount : {"rent" : 3, "buy" : 3} --- group by based on id
itemCount : {"dvd" : 2, "tv" : 1, "movie" : 3 } --- group by based on id
typeForDay : {"rent: 2, "buy" : 2 }  --- group By based on id and dayofmonth(col("timestamp"))  atmost 1 type per day 

I tried:

val count_by_value = udf {( listValues :scala.collection.mutable.WrappedArray[String]) => if (listValues == null) null else  listValues.groupBy(identity).mapValues(_.size)}

val group1 = df.groupBy("id").agg(collect_list("type"),sum("value") as "totalValue", collect_list("item")) 
val group1Result =  group1.withColumn("typeCount", count_by_value($"collect_list(type)"))
                          .drop("collect_list(type)")
                          .withColumn("itemCount", count_by_value($"collect_list(item)"))
                          .drop("collect_list(item)")

val group2 = df.groupBy("id", dayofmonth(col("timestamp"))).agg(collect_set("type")) 
val group2Result =  group2.withColumn("typeForDay", count_by_value($"collect_set(type)"))
                          .drop("collect_set(type)")

val groupedResult = group1Result.join(group2Result, "id").show()

但是这需要时间,有没有其他有效的方法呢?

更好的方法是将每个组字段添加到键&减少它们而不是groupBy()。你可以使用这些:

df1.map(rec => (rec(0), rec(3).toString().toInt)).
     reduceByKey(_+_).take(5).foreach(println)

=> (72)

df1.map(rec => ((rec(0), rec(1)), 1)).
    map(x => (x._1._1, x._1._2,x._2)).
    reduceByKey(_+_).take(5).foreach(println)

=>(1、租金、3)

(1买3)

df1.map(rec => ((rec(0), rec(2)), 1)).
    map(x => (x._1._1, x._1._2,x._2)).
    reduceByKey(_+_).take(5).foreach(println)

=>(1、dvd、2)

(1、电视、1)

(1、电影,3)

df1.map(rec => ((rec(0), rec(1), rec(4).toString().substring(8,10)), 1)).
    reduceByKey(_+_).map(x => (x._1._1, x._1._2,x._1._3,x._2)).
    take(5).foreach(println)

=>(2) 1、租金、19日

(1) 1、购买20日

(2) 1、购买18日

(1) 1、租金、20日

相关内容

  • 没有找到相关文章

最新更新