我正在窗口流上执行聚合,并希望抑制早期聚合结果。早期结果是指在窗口结束之前计算的结果,而不是在宽限期内发生的结果。因此,我想抑制所有带有时间戳<窗口结束的聚合结果,但在窗口关闭><转发时间戳为>= 窗口结束和时间戳的所有记录。转发时间戳为>
最小 Kafka 流拓扑示例:
new StreamsBuilder()
.stream("my-topic")
.windowedBy(TimeWindows.of(myWindowSize).grace(myGracePeriod))
.reduce(myReducer)
.suppress( /* searched for*/ )
.toStream();
因此,Suppressed.untilWindowCloses( .. )
对我来说不是一个选择,因为我必须等到宽限期到期,这可能会很长。
根据 KIP-328,可以使用Suppressed.untilTimeLimit(Duration.ZERO, .. )
as 获得所需的行为(引用自 KIP 的描述(:
a.在发出之前等待更多更新的时间。这是一个时间量,从事件时间(对于常规 KTables(或窗口端(对于窗口 KTables(开始测量,以便在向下游发出每个键之前对其进行缓冲。
然而,Kafka Streams JavaDoc以及相应的实现意味着情况并非如此,时间限制在接收每个(窗口(键的第一条记录时开始倒计时,而不是在窗口结束时开始倒计时。
我很高兴对此进行澄清并支持如何实现预期行为。
KIP 描述不正确(我相应地更新了 wiki 页面(。请注意,再往下看,KIP说:
速率限制更新
假设我们希望将更新速率从 KTable 降低到每个键大约每 30 秒更新一次。我们不想为此使用太多内存,并且我们认为任何时候都不会更新超过 1000 个密钥。
table .suppress(untilTimeLimit(Duration.ofSeconds(30), maxRecords(1000))) .toStream(); // etc.
因此,使用untilTimeLimit
用于定期发出。对于窗口聚合,间隔计时器将在窗口开始时间启动 - 您仍然可以将等待期设置为"窗口大小"以获得任何"早期"更新,但您不会在窗口结束通过后看到每个更新,而只会看到"窗口大小间隔"中的更新。如果你的宽限期真的很长,这可能还足够好吗?
您描述的用例目前不受支持,但我认为这是一个非常有趣且有用的用例。也许您可以创建一个功能请求票证?