用例:使用EventTime并从Kafka的记录中提取时间戳。
myConsumer.assignTimestampsAndWatermarks(new MyTimestampEmitter());
...
stream
.keyBy("platform")
.window(TumblingEventTimeWindows 5 mins))
.aggregate(AggFunc(), WindowFunc())
.countWindowAll(size)
.apply(someFunc)
.addSink(someSink);
我想要什么:Flink提取时间戳,并在初始间隔(例如20秒(内为每条记录发送水印,然后它可以周期性地发送水印(例如每10秒(。
原因:如果我使用PeriodicWatermark,一开始Flink只会在一段时间后发出水印,并且我的第一个5分钟窗口中的计数是错误的,比随后窗口中的数量大得多。我有一个变通方法,将"自动水印间隔"设置为100ms,但这是非常必要的。
当前,我必须使用AssignerWithPeriodicWatermark或AssignerWith标点符号水印。如何实施这种合并策略?谢谢
在对水印生成器执行异常操作之前,我会仔细检查您是否正确诊断了这种情况。总的来说,事件时间窗口的行为应该是决定性的,如果使用相同的输入,则总是产生相同的结果。如果您得到的第一个窗口的结果因生成水印的频率而异,这表明您可能有延迟事件,当水印更频繁地到达时,这些延迟事件会被丢弃,而当水印不太频繁时,这些事件可以被包括在内。也许你的水印没有正确解释你的活动所经历的实际无序程度?或者您的水印是基于System.currentTimeMillis((,而不是事件时间戳?
此外,第一个时间窗口与其他时间窗口不同是正常的,因为时间窗口与epoch对齐,而不是与第一个事件对齐。当然,这样做的效果是,第一个窗口所覆盖的时间比所有其他窗口都短,因此您应该期望它包含的事件更少,而不是更多。
将setAutoWatermarkInterval设置为100ms是一件非常正常的事情。但如果你真的想避免这种情况,你可以考虑使用AssignerWith标点edWatermarks,它最初为每个事件返回一个水印,然后在适当的时间间隔后,返回水印的频率会降低。
在带标点的水印分配器中,为每个事件调用extractTimestamp和checkAndGetNextWatermark方法。您可以在分配器中使用一些瞬态(非闪烁(状态来跟踪第一个事件的时间或计数事件,并在checkAndGetNextWatermark中使用这些信息来最终回退并停止为每个事件生成水印(有时从checkAndGetNext Watermark返回null,而不是水印(。无论何时重新启动应用程序,它都将始终恢复为为每个事件生成水印。
这不会产生一个具有周期性和间断分配器所有特征的分配器,它只是一个自适应的间断分配器。