Kafka 流 - 抑制直到窗口结束(不关闭)



我正在窗口流上执行聚合,并希望抑制早期聚合结果。早期结果是指在窗口结束之前计算的结果,而不是在宽限期内发生的结果。因此,我想抑制所有带有时间戳<窗口结束的聚合结果,但在窗口关闭><转发时间戳为>= 窗口结束和时间戳的所有记录。

最小 Kafka 流拓扑示例:

new StreamsBuilder()
.stream("my-topic")
.windowedBy(TimeWindows.of(myWindowSize).grace(myGracePeriod))
.reduce(myReducer)
.suppress( /* searched for*/ )
.toStream();

因此,Suppressed.untilWindowCloses( .. )对我来说不是一个选择,因为我必须等到宽限期到期,这可能会很长。

根据 KIP-328,可以使用Suppressed.untilTimeLimit(Duration.ZERO, .. )as 获得所需的行为(引用自 KIP 的描述(:

a.在发出之前等待更多更新的时间。这是一个时间量,从事件时间(对于常规 KTables(或窗口端(对于窗口 KTables(开始测量,以便在向下游发出每个键之前对其进行缓冲。

然而,Kafka Streams JavaDoc以及相应的实现意味着情况并非如此,时间限制在接收每个(窗口(键的第一条记录时开始倒计时,而不是在窗口结束时开始倒计时。

我很高兴对此进行澄清并支持如何实现预期行为。

KIP 描述不正确(我相应地更新了 wiki 页面(。请注意,再往下看,KIP说:

速率限制更新

假设我们希望将更新速率从 KTable 降低到每个键大约每 30 秒更新一次。我们不想为此使用太多内存,并且我们认为任何时候都不会更新超过 1000 个密钥。

table
.suppress(untilTimeLimit(Duration.ofSeconds(30), maxRecords(1000)))
.toStream(); // etc.

因此,使用untilTimeLimit用于定期发出。对于窗口聚合,间隔计时器将在窗口开始时间启动 - 您仍然可以将等待期设置为"窗口大小"以获得任何"早期"更新,但您不会在窗口结束通过后看到每个更新,而只会看到"窗口大小间隔"中的更新。如果你的宽限期真的很长,这可能还足够好吗?

您描述的用例目前不受支持,但我认为这是一个非常有趣且有用的用例。也许您可以创建一个功能请求票证?

最新更新