我在翻转窗口上使用抑制来获得聚合结果。我正在探索直到时间限制和直到窗口关闭抑制。 我不希望我的流在缓冲区已满时关闭。我已经看到这个功能发出EarlyWhenFull(),但它不能在WindowCloses之上适用。 因此,我选择直到TimeLImit与emitEarlyWhenFull(),请参考下面的代码:
groupedStreams.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.aggregate(() -> initialBlob, blobAggregator,someserde)
.suppress(Suppressed.untilTimeLimit(Duration.ofMinutes(5), new StrictBufferConfigImpl().emitEarlyWhenFull()))
.toStream()
就我而言,我正在使用翻滚窗户 5 分钟。因此,每 5 分钟,将为每个记录键打开一个窗口。根据文档,当缓冲区被填充时,将发送最旧的记录。 在相同的翻转窗口中发送旧记录后,具有相同键的新记录会发生什么情况?
例如:消息流: (一,1) (A,2) (A,3) -> 累计结果 : (A,6) .假设这里,缓冲区已满,(A,6)将被发送到下游。假设 (A,4) 现在出现在同一个翻滚窗口中,接下来会发生什么?它会是:(A,10)还是会再次以(A,4)重新开始?
如果suppress()
发出,状态将被保留。因此,对于您的示例,聚合将继续,最终将发出 (A,10)。