如何在spark-sql中将多行聚合为指定大小的数组。
我尝试使用聚合函数,但无法帮助指定数组的大小。
Input:
|id | value |
|------|---------|
|435725|{abc,def}|
|435725|{ghi,jkl}|
|435725|{mno,pqr}|
|435725|{stu,vwx}|
|536345|{abc,def}|
|536345|{ghi,jkl}|
|536345|{mno,pqr}|
|536345|{stu,vwx}|
Output:
| id | value |
|--------|---------------------|
|435725_1|[{abc,def},{ghi,jkl}]|
|435725_2|[{mno,pqr},{stu,vwx}]|
|536345_1|[{abc,def},{ghi,jkl}]|
|536345_2|[{mno,pqr},{stu,vwx}]|
您可以首先拆分您的id,以便在分组时将创建相同大小的数组。为此,您可以在窗口上使用基于row_number()
的函数。然后使用collect_list
函数根据这些新id生成数组分组。
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, collect_list, concat_ws, floor, row_number}
val arraySize = 2
val window = Window.partitionBy("id").orderBy("value")
data
.withColumn(
"id",
concat_ws(
"_",
col("id"),
floor((row_number().over(window) - 1) / arraySize) + 1
)
)
.groupBy("id")
.agg(collect_list(col("value")).as("value"))
.orderBy("id") // optional, to order ids
通过这样做,您将得到以下输出:
+--------+----------------------+
|id |collect_list(value) |
+--------+----------------------+
|435725_1|[{abc,def}, {ghi,jkl}]|
|435725_2|[{mno,pqr}, {stu,vwx}]|
|536345_1|[{abc,def}, {ghi,jkl}]|
|536345_2|[{mno,pqr}, {stu,vwx}]|
+--------+----------------------+