如何在groupByKey之后的聚合方法中创建TypedColumn?假设我们有以下数据结构,并创建了一个bean(MyObject(来读取as数据集。
USER | LIST | PARTNER_LIST
Dataset<MyObject> r = sp.read()
.parquet("mypath/*").as(Encoders.bean(MyObject.class));
//group by list
r.groupByKey((MapFunction< MyObject, String>) v ->
v.getList(), Encoders.STRING())
.agg(TypedColumn<MyObject,R> what??? )
在agg函数中,我需要传递一个TypedColumn,但我在网上找不到任何解释我如何做到这一点的东西
有人能给我举个例子吗?
顾名思义,TypedColumn是一个具有Type的列。
- 可以使用
as[]
运算符添加类型 - 也可以使用
name()
方法添加别名
玩具示例:
import org.apache.spark.sql.functions._
val exampleDF = Seq(("Bob", "male"), ("Mary", "female"),
("Mike", "male"), ("Mike", "male")).toDF("name", "gender")
exampleDF.groupByKey(_.getAs[String]("gender")).
agg(countDistinct("name").as[Long].name("distinctCnt")).show()
+------+-----------+
| value|distinctCnt|
+------+-----------+
|female| 1|
| male| 2|
+------+-----------+