在抑制来自窗口 KTable 的输出时,如何正确实现缓冲区配置?



我有一个窗口KTable按预期工作,但每次收到新值时它都会输出。我找到了.suppress运算符,它完全符合我的需求:仅在时间窗口结束时输出结果。我已经向我的TimeWindow中添加了一个grace值,但无法.suppress使用窗口KTable。这个问题的答案显示了 .suppress 应该是什么样子。

在我看来,在阅读 Apache 的文档时,untilWindowCloses是抑制接口的一种方法,这意味着我无法实例化Suppressed对象,对吗?我不确定如何以这种方式实现接口(在窗口KTable上的.suppress参数中(。

我觉得我错过了一些愚蠢的东西,但我搜索了又搜索,无法弄清楚。有什么想法吗?

TimeWindows window = TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofSeconds(10));
final KTable<Windowed<String>, GenericRecord> joinedKTable = groupedStream
.windowedBy(window)
.reduce(new Reducer<GenericRecord>() {
@Override
public GenericRecord apply(GenericRecord aggValue, GenericRecord newValue) {
//reduce code
}
})
.suppress(Suppressed.untilWindowCloses(unbounded())); //need help here

我正在使用 Eclipse,它告诉我"The method unbounded() is undefined."我做错了什么?

您需要静态导入unbounded()或限定引用。

import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;

.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))

最新更新