My DataStream 派生自自定义 SourceFunction,该函数以确定性的顺序发出 WINDOW 大小的字符串序列。 目的是在键控流上创建滑动窗口,以便基于 EventTime 对累积的字符串进行处理。 为了分配事件时间和水印,我将分配者与周期水印附加到流中。 滑动窗口使用自定义 ProcessWindowFunction 进行处理。
env.setStreamTimeCharacteristic(EventTime)
val seqStream = env.addSource(Seqstream)
.assignTimestampsAndWatermarks(SeqTimeStampExtractor())
.keyBy(getEventtimeKey)
.window(SlidingEventTimeWindows.of(Time.milliseconds(windowSize), Time.milliseconds(slideSize)))
val result = seqStream.process(ProcessSeqWindow(target1))
我的分配者与周期性水印如下所示:
class FASTATimeStampExtractor : AssignerWithPeriodicWatermarks<FASTAstring> {
var waterMark = 9999L
override fun extractTimestamp(element: FASTAstring, previousElementTimestamp: Long): Long {
return element.f1
}
override fun getCurrentWatermark(): Watermark? {
waterMark += 1
return Watermark(waterMark)
}
}
换句话说,源发出的每个元素都应该有自己的 EvenTime,并且应该发出 WaterMark 时不允许在该时间内发生进一步的事件。 在调试器中单步执行流,指示按预期生成事件时间/警报
。我的期望是ProcessSeqWindow.run((应该在EventTime上用许多与时间窗口成比例的元素(例如10毫秒(来调用。但是,我观察到的是,run((被多次调用,使用单个元素,并且相对于EventTime以任意顺序调用。 当我强制并行度为 1 时,行为仍然存在。 我的问题是,这是否可能是由每个窗口上的多个触发事件引起的,还是还有其他可能的解释?如何调试原因?
谢谢
水印在作业中的作用是触发关闭 滑动事件时间窗口。为了发挥这个作用 正确地,它们应该基于事件中的时间戳,而不是 比一些任意常数(9999L(。同一对象的原因 负责提取时间戳并提供水印是 以便此对象可以基于其创建的水印 观察事件流中的时间戳。所以除非你的 事件时间戳也基于递增类似的计数器, 这可以解释您看到的一些行为。
另一个问题是,虽然为每个 事件,在定期水印分配器中获取当前水印方法 每 200 毫秒在单独的线程中调用一次(默认情况下(。如果 您希望在每次事件后使用水印,您需要使用 AssignerWithPunctuatedWatermarks,尽管这样做是一种 反模式(因为有那么多水印会增加开销(。
如果您的时间戳完全是人为的,您可能会发现 SlidingCountWindow更适合您正在做的事情。