如何在spark-sql中连接指定大小的数组中的多行



如何在spark-sql中将多行聚合为指定大小的数组。

我尝试使用聚合函数,但无法帮助指定数组的大小。

Input:
|id    |  value  |
|------|---------|
|435725|{abc,def}|
|435725|{ghi,jkl}|
|435725|{mno,pqr}|
|435725|{stu,vwx}|
|536345|{abc,def}|
|536345|{ghi,jkl}|
|536345|{mno,pqr}|
|536345|{stu,vwx}|
Output:
|   id   |       value         |
|--------|---------------------|
|435725_1|[{abc,def},{ghi,jkl}]|
|435725_2|[{mno,pqr},{stu,vwx}]|
|536345_1|[{abc,def},{ghi,jkl}]|
|536345_2|[{mno,pqr},{stu,vwx}]|

您可以首先拆分您的id,以便在分组时将创建相同大小的数组。为此,您可以在窗口上使用基于row_number()的函数。然后使用collect_list函数根据这些新id生成数组分组。

Scala代码如下:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, collect_list, concat_ws, floor, row_number}
val arraySize = 2
val window = Window.partitionBy("id").orderBy("value")
data
.withColumn(
"id", 
concat_ws(
"_", 
col("id"), 
floor((row_number().over(window) - 1) / arraySize) + 1
)
)
.groupBy("id")
.agg(collect_list(col("value")).as("value"))
.orderBy("id") // optional, to order ids

通过这样做,您将得到以下输出:

+--------+----------------------+
|id      |collect_list(value)   |
+--------+----------------------+
|435725_1|[{abc,def}, {ghi,jkl}]|
|435725_2|[{mno,pqr}, {stu,vwx}]|
|536345_1|[{abc,def}, {ghi,jkl}]|
|536345_2|[{mno,pqr}, {stu,vwx}]|
+--------+----------------------+

相关内容

  • 没有找到相关文章

最新更新