KTable显示整个事件日志,而不是最近的更新(Kafka Streams)



我有一个包含4个事件的主题。原始主题事件:主题数据

以下拓扑应该只打印每个股票的最新更新,但它显示了整个日志流。这是代码:

val builder = new streams.StreamsBuilder
val tableData = builder.table[String, StockData](inputTopic)
tableData.toStream().print(Printed.toSysOut[String, StockData].withLabel("table-form"))
builder.build()

但结果日志看起来像这个

[table-form]: CCC, {"stockVal":10,"times":1234}
[table-form]: BBB, {"stockVal":10,"times":1234}
[table-form]: AAA, {"stockVal":10,"times":1234}
[table-form]: AAA, {"stockVal":20,"times":1240}

为什么我要打印两次AAA股票?当应用程序运行时,所有消息都在主题中,因此据我所知,我应该只得到AAA 的最后一个值

谢谢!

对于任何表,table.toStream()都将输出其当前压缩的数据,以及其中的任何新事件。它不会在日志中进行重复数据消除。

如果您总是想要表的当前状态,那么您需要迭代它的statestore。

请记住,表的原始主题也会临时保存重复的键,直到压缩发生。

相关内容

  • 没有找到相关文章

最新更新