Spark 数据帧的自定义聚合



我想知道是否有某种方法可以为 Spark 数据帧指定自定义聚合函数。 如果我有一个包含 2 列的表idvalue我想按 id 分组并将值聚合到每个value的列表中,如下所示:

从:

john | tomato
john | carrot
bill | apple
john | banana
bill | taco

自:

john | tomato, carrot, banana
bill | apple, taco

这在数据帧中可能吗? 我问的是数据帧,因为我将数据作为 orc 文件读取,并且它作为数据帧加载。 我认为将其转换为RDD效率低下。

我只想做以下:

import org.apache.spark.sql.functions.collect_list
val df = Seq(("john", "tomato"), ("john", "carrot"), 
             ("bill", "apple"), ("john", "banana"), 
             ("bill", "taco")).toDF("id", "value")
// df: org.apache.spark.sql.DataFrame = [id: string, value: string]
val aggDf = df.groupBy($"id").agg(collect_list($"value").as("values"))
// aggDf: org.apache.spark.sql.DataFrame = [id: string, values: array<string>]
aggDf.show(false)
// +----+------------------------+
// |id  |values                  |
// +----+------------------------+
// |john|[tomato, carrot, banana]|
// |bill|[apple, taco]           |
// +----+------------------------+

您甚至不需要调用底层rdd

恢复到RDD操作往往最适合以下问题:

scala> val df = sc.parallelize(Seq(("john", "tomato"),
           ("john", "carrot"), ("bill", "apple"), 
           ("john", "bannana"), ("bill", "taco")))
           .toDF("name", "food")
df: org.apache.spark.sql.DataFrame = [name: string, food: string]
scala> df.show
+----+-------+
|name|   food|
+----+-------+
|john| tomato|
|john| carrot|
|bill|  apple|
|john|bannana|
|bill|   taco|
+----+-------+
scala> val aggregated = df.rdd
           .map{ case Row(k: String, v: String) => (k, List(v)) }
           .reduceByKey{_ ++ _}
           .toDF("name", "foods")
aggregated: org.apache.spark.sql.DataFrame = [name: string, foods: array<string>]
scala> aggregated.collect.foreach{println}
[john,WrappedArray(tomato, carrot, bannana)]
[bill,WrappedArray(apple, taco)]

至于效率,我相信DataFrames RDD引擎盖下,所以像.rdd这样的转换成本很低。

相关内容

  • 没有找到相关文章

最新更新