我有下一个错误,实际上我不理解。
o.a.k.s.k.i.KStreamSlidingWindowAggregate - Skipping record for expired window. topic=[...] partition=[0] offset=[16880] timestamp=[1662556875000] window=[1662542475000,1662556875000] expiration=[1662556942000] streamTime=[1662556942000]
streamTime=[166556942000]
时间戳=[166255675000]
stream时间戳=67s
窗口大小为4小时。
宽限期为0
为什么记录被跳过,而我没有得到输出消息?它属于window。是记录故障
更新:在阅读了更多关于kafka流的信息后,我了解到在每条消息上都会创建两个窗口:
- (消息时间窗口(,此窗口包含消息
- (消息时间+窗口(,并且此窗口排除消息
窗口1已过期。窗口2没有。这就是为什么我看不到消息的原因。
但从逻辑上讲,这是错误的,消息属于窗口,但我并没有发出消息。
示例
sliding window time diff = 10, grace = 0
stream time = 0
send message (time = 10, key = 2) -> message key = 2; stream time = 10
send message (time = 4, key = 1) -> no out message;
send message (time = 5, key = 1) -> no out message;
last message belongs to window (stream-time - window-time)
------ restart stream -------
stream time = 0
send message (time = 10, key = 2) -> message key = 2; stream time = 10
send message (time = 4, key = 2) -> 2 message
在Kafka Streams中,滑动窗口是基于事件的。每次新记录进入窗口或从窗口中删除时,都会创建一个新窗口。它由记录时间戳和固定的持续时间定义
每条记录都会创建一个窗口[记录时间戳-持续时间,记录时间戳],
每条删除的记录都会生成一个窗口[记录时间戳+1ms,记录时间印记+1ms+持续时间]
(请注意,其他流处理框架使用完全不同的"滑动窗口"定义(
当stream-time > window-end + grace-period
(https://kafka.apache.org/27/javadoc/org/apache/kafka/streams/kstream/SlidingWindows.html)
对于您的初始示例,宽限期为零,并且您的窗口在流时间之后结束(在记录时间戳处(;因此该记录不包括在该窗口中。
对于第二个例子,我不确定。我的猜测是,密钥=1的记录已过期,因为流时间(10(已超过记录时间(4,5((宽限期=0(。对于关键字为2的记录,为时间戳为10的记录创建一个窗口,并发出同一窗口的更新,因为该记录满足上述条件。但是,由于宽限期为零,因此不会为无序记录创建其他窗口。