KafkStreams:在窗口期间丢弃消息



需要在时间范围内丢弃重复的消息。消息不断传入。波纹管是代码的一部分。

 kStream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
            .windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
             .reduce((k,m) -> m)
             .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
             .toStream()
             .foreach((k, v) -> doSomeProcess(k,v));

我在这里做错了什么。我没有看到任何对方法doSomeProcess的调用。消息正在传入。

原来"此功能需要为窗口添加"宽限期"参数" 从 https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables.....windowedBy(TimeWindows.of(Duration.ofSeconds(15((.grace(Duration.ofSeconds(5(( (....

这解决了问题。

相关内容

  • 没有找到相关文章

最新更新