我正在使用Apache Flink的DataSet API,发现sortPartition
转换只支持位置键和表达式键,而不支持键选择器函数。
我使用java.util.Map
作为位置键或表达式键不支持的数据类型。如何在java.util.Map
等数据类型上使用sortPartition
转换?
Flink的键选择器函数是语法糖,可以通过两个映射函数轻松手动实现。
- 第一个
MapFunction
提取密钥字段并返回一个Tuple2<Key, Input>
,其中密钥字段是提取的密钥,输入字段是原始输入(在您的情况下是Map
) - 使用第一个元组字段(索引0)上的位置键对结果数据集进行排序
- 第二个CCD_ 8从CCD_
整体代码看起来像:
DataSet<Map> input = ...
DataSet<Tuple2<Long, Map>> keyed = input.map(new KeyExtractMap());
DataSet<Tuple2<Long, Map>> sortedKeyed = keyed.sortPartition(0, Order.ASCENDING);
DataSet<Map> sorted = sortedKeyed.map(new UnwrapMap());
谢谢,它非常有用,所以我可以使用这种类似的方法来解决排序组数据集上的聚合吗?
例如:
DataSet<Map> input = ...
DataSet<<Tuple4<Long,Long,Long,Map>> keyed = input.map(new KeyExtractMap());
DataSet<<Tuple4<Long,Long,Long,Map>> sortGrouped = keyed.groupBy(0).sortGroup(1,Order.ASCENDING).sortGroup(2,Order.ASCENDING);
DataSet<Map> result = sortGrouped.map(new UnwrapMap());