我如何确定在Spring Cloud Stream API中更新了Keystore



我使用spring cloud stream api的聚合函数从主题创建了一个物化视图。如下所示:

public void process(KStream<Object, Object> input){
input
.peek((key, value) ->{...}
.map((key, value) -> {...}
.groupByKey()
.windowedBy(TimeWindows.of(5000))
.aggregate(Initializer, Aggregator, Materialized)

然后我用查询我创建的Statestore

ReadOnlyWindowStore<Object, Object> windowStore =
queryService.getQueryableStoreType("test", QueryableStoreTypes.windowStore());

现在我的问题是,在process方法处理了一个新事件之后,我如何确定这个statestore已经更新?他们的活动是我可以听的还是我可以创造的?

您的程序是:

input
.peek((key, value) ->{...}
.map((key, value) -> {...}
.groupByKey()
.windowedBy(TimeWindows.of(5000))
.aggregate(Initializer, Aggregator, Materialized)

事实上,最后一个aggregate()返回一个KTable对象。如果您通过Materialized禁用缓存,您可以通过了解KTable的每一次更新

input
.peek((key, value) ->{...}
.map((key, value) -> {...}
.groupByKey()
.windowedBy(TimeWindows.of(5000))
.aggregate(Initializer, Aggregator, Materialized) // disable caching via Materialized
.toStream()
.foreach(...) // react to every update to the KTable

相关内容

  • 没有找到相关文章

最新更新