火花:按行列表输出获取组



在火花中使用分组时,是否可以获得Dataset<List<Row>>

作为输出。此处的行是原始行。

Dataset<<List<Row>> output = dataset.groupBy("key");

如果使用聚合并且collect_list则在输出行中,则无法保证列表格式的值按顺序排列。因此,就我而言,这不是一个好的解决方案。

例如:带聚合的输出。但无法保证设定值的顺序。

+-----+----------------------------+
|item1|set                         |
+-----+----------------------------+
|1    |[[5,3], [4,1], [3,2], [2,2]]|
|2    |[[4,1], [1,2], [5,2], [3,1]]|
+-----+----------------------------+ 

请告知是否有办法在不使用pojos的情况下将输出Dataset<List<Row>>。(例如基于 pojo 的解决方案:FlatMapGroupsWIthStateFunction (

我有一个解决方案的开始给你。您可以使用monotonically_increasing_id创建索引并"记住"数据帧的顺序。然后,您可以按键分组,使用 collect_list 聚合结果,按索引对列表进行排序,最后将其删除。

在SparkSQL中存在一个sort_array函数来对数组进行排序。不幸的是,我不知道在 sparkSQL 数组上有任何等效的 map 函数来删除索引。这就是为什么我提出了一个基于 UDF 的解决方案:

// the UDF that sorts by the index "i" and keeps the value
val sort_and_strip = udf{ (x : WrappedArray[Row]) =>
    x.sortBy(_.getAs[Long]("i"))
     .map(_.getAs[Long]("value"))
}
// an example of use:
spark.range(7)
    .select('id % 3 as "key", 'id as "value")
    .withColumn("i", monotonically_increasing_id)
    .groupBy("key")
    .agg(collect_list(struct('i, 'value)) as "list")
    .withColumn("list", sort_and_strip('list))
    .show(false)
+---+---------+
|key|list     |
+---+---------+
|0  |[0, 3, 6]|
|1  |[1, 4]   |
|2  |[2, 5]   |
+---+---------+

最新更新