在火花中使用分组时,是否可以获得Dataset<List<Row>>
作为输出。此处的行是原始行。
Dataset<<List<Row>> output = dataset.groupBy("key");
如果使用聚合并且collect_list
则在输出行中,则无法保证列表格式的值按顺序排列。因此,就我而言,这不是一个好的解决方案。
例如:带聚合的输出。但无法保证设定值的顺序。
+-----+----------------------------+
|item1|set |
+-----+----------------------------+
|1 |[[5,3], [4,1], [3,2], [2,2]]|
|2 |[[4,1], [1,2], [5,2], [3,1]]|
+-----+----------------------------+
请告知是否有办法在不使用pojos的情况下将输出Dataset<List<Row>>
。(例如基于 pojo 的解决方案:FlatMapGroupsWIthStateFunction
(
我有一个解决方案的开始给你。您可以使用monotonically_increasing_id
创建索引并"记住"数据帧的顺序。然后,您可以按键分组,使用 collect_list
聚合结果,按索引对列表进行排序,最后将其删除。
在SparkSQL中存在一个sort_array
函数来对数组进行排序。不幸的是,我不知道在 sparkSQL 数组上有任何等效的 map
函数来删除索引。这就是为什么我提出了一个基于 UDF 的解决方案:
// the UDF that sorts by the index "i" and keeps the value
val sort_and_strip = udf{ (x : WrappedArray[Row]) =>
x.sortBy(_.getAs[Long]("i"))
.map(_.getAs[Long]("value"))
}
// an example of use:
spark.range(7)
.select('id % 3 as "key", 'id as "value")
.withColumn("i", monotonically_increasing_id)
.groupBy("key")
.agg(collect_list(struct('i, 'value)) as "list")
.withColumn("list", sort_and_strip('list))
.show(false)
+---+---------+
|key|list |
+---+---------+
|0 |[0, 3, 6]|
|1 |[1, 4] |
|2 |[2, 5] |
+---+---------+