我正在尝试使用Spark Scala捕获特定的数据子集我部分能够完成它,但在这里寻求指导。
数据如下:
<表类>
Member_id
Start_Date
End_Date
产品
tbody><<tr>abcd 20220101 20221201 Prod1 abcd 20220101 20230201 Prod2 efgh 20220101 20221201 Prod1 efgh 20220101 20230201 Prod1 ijkl 20220201 20230201 Prod1 mnop 20220501 20221201 Prod1 mnop 20220501 20230201 Prod2 qrst 20220501 20230201 Prod2 表类>
这里并不需要窗口函数。通常,当您需要每行的输出时,您会使用窗口函数。因为你想对member_id
和start_date
的组进行操作,你可以使用groupBy
并对其进行聚合。我在spark-shell
中运行了您的示例:
val testDF = Seq(
("abcd",20220101,20221201,"Prod1"),
("abcd",20220101,20230201,"Prod2"),
("efgh",20220101,20221201,"Prod1"),
("efgh",20220101,20230201,"Prod1"),
("ijkl",20220201,20230201,"Prod1"),
("mnop",20220501,20221201,"Prod1"),
("mnop",20220501,20230201,"Prod2"),
("qrst",20220501,20230201,"Prod2")
).toDF("member_id", "start_date", "end_date", "product")
testDF.groupBy("member_id", "start_date")
.agg(countDistinct("product").as("distinct_products"))
.filter(col("distinct_products") > 1)
.show
输出:
+---------+----------+-----------------+
|member_id|start_date|distinct_products|
+---------+----------+-----------------+
| abcd| 20220101| 2|
| mnop| 20220501| 2|
+---------+----------+-----------------+