作为我在流应用程序中的最后一步,我想对系统中的无序事件进行排序。 为此,我使用了:
events.keyBy((Event event) -> event.id)
.process(new SortFunction())
.print();
其中sort
函数是:
public static class SortFunction extends KeyedProcessFunction<String, Event, Event> {
private ValueState<PriorityQueue<Event>> queueState = null;
@Override
public void open(Configuration config) {
ValueStateDescriptor<PriorityQueue<Event>> descriptor = new ValueStateDescriptor<>(
// state name
"sorted-events",
// type information of state
TypeInformation.of(new TypeHint<PriorityQueue<Event>>() {
}));
queueState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(Event event, Context context, Collector<Event> out) throws Exception {
TimerService timerService = context.timerService();
if (context.timestamp() > timerService.currentWatermark()) {
PriorityQueue<Event> queue = queueState.value();
if (queue == null) {
queue = new PriorityQueue<>(10);
}
queue.add(event);
queueState.update(queue);
timerService.registerEventTimeTimer(event.timestamp);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<Event> out) throws Exception {
PriorityQueue<Event> queue = queueState.value();
Long watermark = context.timerService().currentWatermark();
Event head = queue.peek();
while (head != null && head.timestamp <= watermark) {
out.collect(head);
queue.remove(head);
head = queue.peek();
}
}
}
我现在想做的是试图把它对准化。我目前的想法是执行以下操作:
events.keyBy((Event event) -> event.id)
.rebalance()
.process(new SortFunction()).setParalelism(3)
.map(new KWayMerge()).setParalelism(1).
.print();
如果我理解是正确的,在这种情况下应该发生什么,如果我错了,请纠正我,是给定键的每个事件的一部分(理想情况下是 1/3)将转到SortFunction
的每个并行实例,在这种情况下,要进行完整的排序,我需要创建一个map
, 或另一个processFunction
,它从 3 个不同的实例接收排序的事件并将它们合并在一起。
如果是这种情况,有没有办法区分map
收到的事件的来源,以便我可以在map
上执行 3 向合并?如果这是不可能的,我的下一个想法是将PriorityQueue
换成TreeMap
并将所有内容放入一个窗口中,以便在收到 3 个TreeMaps
后在窗口末尾进行合并。如果选项 a 不可行,或者是否有更好的解决方案来做这样的事情,那么这个其他选项是否有意义?
首先,你应该知道,当你使用基于堆的状态后端时,在 Flink ValueState 中使用 PriorityQueue 或 TreeMap 是一个好主意。在 RocksDB 的情况下,这将执行得非常糟糕,因为 PriorityQueues 将在每次访问时反序列化,并在每次更新时重新序列化。一般来说,我们建议基于 MapState 进行排序,这就是 Flink 库中排序的实现方式。
此代码将执行的操作
events.keyBy((Event event) -> event.id)
.process(new SortFunction())
是逐键独立地对流进行排序 - 输出将相对于每个键进行排序,但不是全局排序。
另一方面,这
events.keyBy((Event event) -> event.id)
.rebalance()
.process(new SortFunction()).setParalelism(3)
将不起作用,因为重新平衡的结果不再是键控流,并且排序函数取决于键控状态。
此外,我不相信执行 3 种 1/3 的流然后合并结果会明显优于单个全局排序。如果需要执行全局排序,则可能需要考虑改用表 API。有关示例,请参阅此处的答案。