我在一个相对简单的Windowed WordCount示例中遇到了困难。我试图只获得窗口化的结果,但根本没有收到任何结果。
KStream<String, Long> sl = s
...
.groupBy((key, value) -> value)
.windowedBy(of(ofSeconds(5))
.advanceBy(ofSeconds(3))
.grace(ofSeconds(2)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(
"counts-store").withRetention(ofSeconds(7)))
.suppress(untilWindowCloses(unbounded()))
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), value))
.to(outputTopicName, produced);
我正在输入一些输入:
inputWords.pipeInput(new TestRecord<>("word", "a b c", now));
inputWords.pipeInput(new TestRecord<>("word", "a c d c", now.plus(ofSeconds(6))));
inputWords.pipeInput(new TestRecord<>("word", "", now.plus(Duration.ofDays(1))));
但什么也不会被释放。有人知道可能的解决方案吗?
正如你所看到的,我已经在使用优雅和保留,正如其他人所写的那样,这可能会有所帮助,但实际上没有帮助。评论时,抑制行一切正常。
您必须为计数Materialized
视图提供有效的Serdes
,这样Kafka Stream才能正确地为内部抑制处理器提供有效的Window Serdes,如果没有,则该处理器将选择默认的密钥Serdes,这可能会导致序列化无法正常工作,我在KTableSuppressProcessor.buffer()
中得到以下异常:
//please check if you get this exception
java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:86)
为Materialized视图counts-store
正确提供有效的Serde
,您应该得到预期的输出:
Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("counts-store")
.withRetention(ofSeconds(7))
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long())