我们正在接收来自独立数据源的事件,因此,到达我们的 Flink 拓扑(通过 Kafka)的数据将是无序的。
我们正在 Flink 拓扑中创建 1 分钟事件时间窗口,并在源运算符处生成事件时间水印(当前事件时间 - 某个阈值(30 秒))。
如果一些事件在设置的阈值之后到达,则简单地忽略这些事件(在我们的例子中没关系,因为属于该分钟的大多数事件都已经到达并在相应的窗口中进行处理)。
现在,问题是,如果程序崩溃(无论出于何种原因),然后从最后一个成功的检查点再次恢复,无序到达事件将触发过去(已处理的)窗口的执行(该窗口中只有极少数事件)覆盖该窗口的先前计算结果。
如果 Flink 有检查点事件时间水印,就不会发生此问题。
所以,我想知道是否有办法在 Flink 中强制执行事件时间水印的检查点......
我认为最简单的解决方案是在窗口运算符之后注入ProcessFunction
。
ProcessFunction
可以通过其Context
对象访问当前水印,并可以将其存储在联合运算符状态。 如果发生故障,ProcessFunction
会从其状态中恢复水印,并过滤时间戳小于水印的所有记录(时间戳也可以通过Context
obejct 访问)。
虽然这是一个老问题,但我也有同样的问题。应用程序正在重新启动,具有事件时间窗口的联接函数不再触发,因为其中一个流中的事件在崩溃之前完成。联接可以恢复状态,但由于其中一个流中不再有水印,因此事件在重新启动后永远不会加入。
我找到的解决方案是在源运算符之后为最新水印创建一个检查点。由于没有 UDF 来持久化水印快照,我不得不创建自己的运算符,该运算符不会更改事件(标识函数)并将最新的水印保存为其状态。当 Flink 从崩溃中恢复时,WatermarkStreamOperator.initializeState()
会在行processWatermark(new Watermark(maxWatermark))
的ListState<Long> latestWatermark
上发出最后一个水印检查点。然后可以触发与事件时间窗口的联接。
public class WatermarkStreamOperator<IN> extends AbstractUdfStreamOperator<IN, WatermarkFunction<IN>>
implements OneInputStreamOperator<IN, IN> {
private static final long serialVersionUID = 1L;
private ListState<Long> latestWatermark;
public WatermarkStreamOperator(WatermarkFunction<IN> mapper) {
super(mapper);
chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void initializeState(StateInitializationContext context) throws Exception { System.out.println("WatermarkStreamOperator.initializeState");
super.initializeState(context);
ListStateDescriptor<Long> descriptor = new ListStateDescriptor<>("latest-watermark", Long.class);
latestWatermark = context.getOperatorStateStore().getListState(descriptor);
List<Long> watermarkList = new ArrayList<>();
latestWatermark.get().forEach(watermarkList::add);
Long maxWatermark = watermarkList.stream().max(Long::compare).orElse(0L);
if (!maxWatermark.equals(Long.valueOf(0l))) {
System.out.println("watermarkList recovered max: " + maxWatermark);
processWatermark(new Watermark(maxWatermark));
}
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
output.collect(element);
}
@Override
public void processWatermark(Watermark mark) throws Exception {
System.out.println("processing watermark: " + mark.getTimestamp()); latestWatermark.update(Arrays.asList(mark.getTimestamp()));
super.processWatermark(mark);
}
}
以及运算符的标识 UDF:
public interface WatermarkFunction<T> extends Function, Serializable {
T process(T value) throws Exception;
}
最后,我使用.transform()
用MyTupleWatermarkFunc
来调用我的WatermarkStreamOperator
。
DataStream<Tuple2<String, Integer>> dataStream = env
.addSource(new MySource(sentence))
.transform("myStatefulWatermarkOperator",
TypeInformation.of(String.class),
new WatermarkStreamOperator<>(new MyTupleWatermarkFunc()))
...
...
public class MyTupleWatermarkFunc implements WatermarkFunction<String> {
private static final long serialVersionUID = 1L;
@Override
public String process(String value) throws Exception {
return value;
}
}
以下是我为此创建的单元和模拟测试 https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/test/java/org/sense/flink/examples/stream/operator/watermark/WatermarkStreamOperatorTest.java