下面是我用来测试检查点/保存点的Flink代码示例。
DataStream<Alarm> complexAlarmStream = alarmStream.keyBy(Alarm::getOntId)
.window(TumblingProcessingTimeWindows.of(Time.minutes(config .get(CheckPointTestingHighlevelFunctionParams.ONT_POWEROFF_DETECTION_WINDOW_TIME_IN_MIN))))
.process(new CheckPointTestingHighlevelFunction())
.uid("CheckPointTestingFunction").name("CheckPointTestingFunction");
这里的窗口是一个有状态操作,它在CheckPointTestingHighlevelFunctionParams.ONT_POWEROFF_DETECTION_WINDOW_TIME_IN_MIN
中提到的时间内保存数据。如何给窗口函数一个UID。
您共享的代码正在为窗口操作符设置UID。当指定
时.keyBy(...)
.window(...)
.allowedLateness(...)
.sideOutputLateData(...)
.process(...)
.uid("window-id").name("window-id")
所有这些都有助于定义窗口操作符。