kafka流滑动agg窗口丢弃无序记录属于没有宽限期的窗口



我有下一个错误,实际上我不理解。

o.a.k.s.k.i.KStreamSlidingWindowAggregate - Skipping record for expired window. topic=[...] partition=[0] offset=[16880] timestamp=[1662556875000] window=[1662542475000,1662556875000] expiration=[1662556942000] streamTime=[1662556942000]

streamTime=[166556942000]

时间戳=[166255675000]

stream时间戳=67s

窗口大小为4小时。

宽限期为0

为什么记录被跳过,而我没有得到输出消息?它属于window。是记录故障

更新:在阅读了更多关于kafka流的信息后,我了解到在每条消息上都会创建两个窗口:

  1. (消息时间窗口(,此窗口包含消息
  2. (消息时间+窗口(,并且此窗口排除消息

窗口1已过期。窗口2没有。这就是为什么我看不到消息的原因。

但从逻辑上讲,这是错误的,消息属于窗口,但我并没有发出消息。

示例

sliding window time diff = 10, grace = 0

stream time = 0
send message (time = 10, key = 2) -> message key = 2; stream time = 10
send message (time = 4, key = 1) -> no out message; 
send message (time = 5, key = 1) -> no out message; 
last message belongs to window (stream-time - window-time)
------ restart stream -------
stream time = 0
send message (time = 10, key = 2) -> message key = 2; stream time = 10
send message (time = 4, key = 2) -> 2 message

在Kafka Streams中,滑动窗口是基于事件的。每次新记录进入窗口或从窗口中删除时,都会创建一个新窗口。它由记录时间戳和固定的持续时间定义
每条记录都会创建一个窗口[记录时间戳-持续时间,记录时间戳],
每条删除的记录都会生成一个窗口[记录时间戳+1ms,记录时间印记+1ms+持续时间]
(请注意,其他流处理框架使用完全不同的"滑动窗口"定义(

stream-time > window-end + grace-period(https://kafka.apache.org/27/javadoc/org/apache/kafka/streams/kstream/SlidingWindows.html)

对于您的初始示例,宽限期为零,并且您的窗口在流时间之后结束(在记录时间戳处(;因此该记录不包括在该窗口中。

对于第二个例子,我不确定。我的猜测是,密钥=1的记录已过期,因为流时间(10(已超过记录时间(4,5((宽限期=0(。对于关键字为2的记录,为时间戳为10的记录创建一个窗口,并发出同一窗口的更新,因为该记录满足上述条件。但是,由于宽限期为零,因此不会为无序记录创建其他窗口。

最新更新