需要在时间范围内丢弃重复的消息。消息不断传入。波纹管是代码的一部分。
kStream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
.reduce((k,m) -> m)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.foreach((k, v) -> doSomeProcess(k,v));
我在这里做错了什么。我没有看到任何对方法doSomeProcess的调用。消息正在传入。
原来"此功能需要为窗口添加"宽限期"参数" 从 https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables.....windowedBy(TimeWindows.of(Duration.ofSeconds(15((.grace(Duration.ofSeconds(5(( (....
这解决了问题。