我想知道是否有某种方法可以为 Spark 数据帧指定自定义聚合函数。 如果我有一个包含 2 列的表id
和value
我想按 id
分组并将值聚合到每个value
的列表中,如下所示:
从:
john | tomato
john | carrot
bill | apple
john | banana
bill | taco
自:
john | tomato, carrot, banana
bill | apple, taco
这在数据帧中可能吗? 我问的是数据帧,因为我将数据作为 orc 文件读取,并且它作为数据帧加载。 我认为将其转换为RDD效率低下。
我只想做以下:
import org.apache.spark.sql.functions.collect_list
val df = Seq(("john", "tomato"), ("john", "carrot"),
("bill", "apple"), ("john", "banana"),
("bill", "taco")).toDF("id", "value")
// df: org.apache.spark.sql.DataFrame = [id: string, value: string]
val aggDf = df.groupBy($"id").agg(collect_list($"value").as("values"))
// aggDf: org.apache.spark.sql.DataFrame = [id: string, values: array<string>]
aggDf.show(false)
// +----+------------------------+
// |id |values |
// +----+------------------------+
// |john|[tomato, carrot, banana]|
// |bill|[apple, taco] |
// +----+------------------------+
您甚至不需要调用底层rdd
。
恢复到RDD
操作往往最适合以下问题:
scala> val df = sc.parallelize(Seq(("john", "tomato"),
("john", "carrot"), ("bill", "apple"),
("john", "bannana"), ("bill", "taco")))
.toDF("name", "food")
df: org.apache.spark.sql.DataFrame = [name: string, food: string]
scala> df.show
+----+-------+
|name| food|
+----+-------+
|john| tomato|
|john| carrot|
|bill| apple|
|john|bannana|
|bill| taco|
+----+-------+
scala> val aggregated = df.rdd
.map{ case Row(k: String, v: String) => (k, List(v)) }
.reduceByKey{_ ++ _}
.toDF("name", "foods")
aggregated: org.apache.spark.sql.DataFrame = [name: string, foods: array<string>]
scala> aggregated.collect.foreach{println}
[john,WrappedArray(tomato, carrot, bannana)]
[bill,WrappedArray(apple, taco)]
至于效率,我相信DataFrames
RDD
引擎盖下,所以像.rdd
这样的转换成本很低。