在为我的项目实现业务规则时,我需要减少流应用程序产生的事件数量,以节省资源并使处理器尽可能快。我发现Kafka提供了基于RecordTime或WindowEndTime来抑制中间事件的能力。我使用suppress的代码:
KTable<Long, ProductWithMatchRecord> productWithCompetitorMatchKTable = competitorProductMatchWithLinkInfo.groupBy(
(linkMatchProductRecordId, linkMatchWithProduct) -> KeyValue.pair(linkMatchWithProduct.linkMatch().tikiProductId(), linkMatchWithProduct),
Grouped.with(longPayloadJsonSerde, linkMatchWithProductJSONSerde).withName("group-match-record-by-product-id")
).aggregate(
ProductWithMatchRecord::new,
(tikiProductId, linkMatchWithProduct, aggregate) -> aggregate.addLinkMatch(linkMatchWithProduct),
(tikiProductId, linkMatchWithProduct, aggregate) -> aggregate.removeLinkMatch(linkMatchWithProduct),
Named.as("aggregate-match-record-by-product-id"),
Materialized
.<Long, ProductWithMatchRecord, KeyValueStore<Bytes, byte[]>>as("match-record-by-product-id")
.withKeySerde(longPayloadJsonSerde)
.withValueSerde(productWithMatchRecordJSONSerde)
)
.suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(10), null));
基本上,它只是一个从其他KTable、聚合、连接、....获取输入的KTable然后Suppress
问题是我期望1个给定键的1个事件,如果在接下来的10秒内没有此键的事件,则productWithCompetitorMatchKTable中的相应数据将被生产出来。然而,在10秒(或更长时间)之后,没有触发给定的事件,直到我为这个键创建另一个事件。请帮助我解决这个问题,或者参考一些文档来源,我可以了解更多关于Kafka流应用程序的抑制功能。
我试着调试和代码,并改变许多配置的抑制。然而,直到timelimit函数,它并没有像我预期的那样工作。
需要新的事件来触发"时间检查"。看一下"标点符号">