我正在尝试解决Kafka的以下问题。有一个话题。让我们称之为SRC-Topic。我不时收到此主题的记录。我想将这些值存储在ktable中,并每10秒发出每10秒存储在DST-TOPIC中的值。当我第一次发出该KTable的值时,我想将1附加到我发出的记录中。随后的每个时间我想将0附加到发射的记录。
我正在寻找适用于此问题的正确的惯用解决方案。我看到的解决方案之一是在我从SRC-TOPIC摄入时会发出1个附加的记录,然后将其存储在ktable中,并附上0个记录。另一个线程将从此KTable读取并定期发射记录。这种方法的问题是它具有种族条件。
任何建议都将不胜感激。
没有直截了当的方法。注意, KTable
是一个changelog stream (它可能在内部具有表状态 - 并非所有ktables都有一个状态 - 但这是实现细节)。
因此,KTable
是流,您不能冲洗流...并且由于状态(如果有的话)是内部的,所以您不能冲洗状态。
您只能通过交互式查询访问状态,该查询也允许进行范围扫描。但是,这不会在下游发出任何内容,而是将数据授予您应用程序的"非流部分"。
我认为,您需要使用低级处理器API来获得所需的结果。