这个问题涵盖了如何使用 Flink SQL 对无序流进行排序,但我更愿意使用 DataStream API。一种解决方案是使用 ProcessFunction 来执行此操作,该函数使用 PriorityQueue 来缓冲事件,直到水印指示它们不再无序,但这在 RocksDB 状态后端中表现不佳(问题是每次访问 PriorityQueue 都需要整个 PriorityQueue 的 ser/de(。无论后端正在使用哪种状态,我如何才能有效地执行此操作?
一个更好的方法(或多或少是由 Flink 的 SQL 和 CEP 库在内部完成的(是在 MapState 中缓冲无序流,如下所示:
如果要单独对每个键进行排序,则首先对流进行键控。否则,对于全局排序,请按常量对流进行键控,以便可以使用 KeyedProcessFunction 实现排序。
在该进程函数的open
方法中,实例化一个 MapState 对象,其中键是时间戳,值是具有相同时间戳的流元素的列表。
在onElement
方法中:
- 如果事件延迟,请将其删除或发送到端输出
- 否则,将事件附加到与其时间戳对应的映射条目
- 为此事件的时间戳注册事件计时器
调用onTimer
时,映射中此时间戳的条目已准备好作为排序流的一部分发布 - 因为当前水印现在指示所有先前的事件都应该已经处理完毕。在将事件发送到下游后,不要忘记清除地图中的条目。
不幸的是,带有计时器的解决方案对我们不起作用。由于生成了大量计时器,它导致检查点失败。作为替代方案,我们对翻转的窗户进行了排序:
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.stream.StreamSupport;
public class EventSortJob {
private static final Duration ALLOWED_LATENESS = Duration.ofMillis(2);
private static final Duration SORT_WINDOW_SIZE = Duration.ofMillis(5);
private static final Logger LOGGER = LoggerFactory.getLogger(EventSortJob.class);
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SingleOutputStreamOperator<Integer> source = env
.fromElements(0, 1, 2, 10, 9, 8, 3, 5, 4, 7, 6)
.assignTimestampsAndWatermarks(
new WatermarkStrategy<Integer>() {
@Override public WatermarkGenerator<Integer> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<Integer>() {
private long watermark = Long.MIN_VALUE;
// punctuated watermarks are used here for demonstration purposes only!!!
@Override public void onEvent(Integer event, long eventTimestamp, WatermarkOutput output) {
long potentialWatermark = event - ALLOWED_LATENESS.toMillis(); // delay watermark behind latest timestamp
if (potentialWatermark > watermark) {
watermark = potentialWatermark;
output.emitWatermark(new Watermark(watermark));
LOGGER.info("watermark = {}", watermark);
}
}
// normally, periodic watermarks should be used
@Override public void onPeriodicEmit(WatermarkOutput output) {}
};
}
@Override public TimestampAssigner<Integer> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return (element, recordTimestamp) -> element; // for simplicity, element values are also timestamps (in millis)
}
}
);
OutputTag<Integer> lateEventsTag = new OutputTag<Integer>("lateEventsTag") {};
SingleOutputStreamOperator<Integer> sorted = source
.keyBy(v -> 1)
.window(TumblingEventTimeWindows.of(Time.milliseconds(SORT_WINDOW_SIZE.toMillis())))
.sideOutputLateData(lateEventsTag)
.process(new ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>() {
@Override public void process(
Integer integer,
ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>.Context context,
Iterable<Integer> elements,
Collector<Integer> out
) {
StreamSupport.stream(elements.spliterator(), false)
.sorted()
.forEachOrdered(out::collect);
}
});
source.keyBy(v -> 1).map(v -> String.format("orig: %d", v)).addSink(new PrintSinkFunction<>());
sorted.addSink(new PrintSinkFunction<>());
sorted.getSideOutput(lateEventsTag).keyBy(v -> 1).map(v -> String.format("late: %d", v)).addSink(new PrintSinkFunction<>());
env.execute();
}
}