我从AbstractStreamOperator和OneInputStreamOperator 编写了自己的运算符扩展
implements OneInputStreamOperator<GenericRecord, Void>
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
...
}
然后我转换我的数据流
OneOperator oneOperator = new OneOperator();
input.transform(oneOperator.getClass().getSimpleName(), Types.VOID, oneOperator)
.setParallelism(1)
.setMaxParallelism(1)
.addSink(new DiscardingSink<>())
.setParallelism(1)
.uid("oneOperator");
这样我就可以使用oneOperator来做一些事情。
在我的单元测试中,我将检查点设置为500ms,autowatermarkInterval设置为10,然后我将2个事件传递到输入中,2个事件之间的时间戳间隔为2小时,这足以触发检查点。
因此,根据我的理解,一旦触发检查点,就会调用notifyCheckpointComplete。
但当我运行单元测试时,notifyCheckpointComplete函数从未被调用。
我错过什么了吗?
谢谢。
检查点与水印无关(大多数情况下(:(因此,当您将检查点设置为500ms时,处理时间将为500ms,因此您注入的数据并不重要。如果测试很快完成,检查点可能不会被触发,因此您可能需要减少检查点时间或添加一些睡眠来等待检查点被触发。