我使用spring cloud stream api的聚合函数从主题创建了一个物化视图。如下所示:
public void process(KStream<Object, Object> input){
input
.peek((key, value) ->{...}
.map((key, value) -> {...}
.groupByKey()
.windowedBy(TimeWindows.of(5000))
.aggregate(Initializer, Aggregator, Materialized)
然后我用查询我创建的Statestore
ReadOnlyWindowStore<Object, Object> windowStore =
queryService.getQueryableStoreType("test", QueryableStoreTypes.windowStore());
现在我的问题是,在process方法处理了一个新事件之后,我如何确定这个statestore已经更新?他们的活动是我可以听的还是我可以创造的?
您的程序是:
input
.peek((key, value) ->{...}
.map((key, value) -> {...}
.groupByKey()
.windowedBy(TimeWindows.of(5000))
.aggregate(Initializer, Aggregator, Materialized)
事实上,最后一个aggregate()
返回一个KTable
对象。如果您通过Materialized
禁用缓存,您可以通过了解KTable
的每一次更新
input
.peek((key, value) ->{...}
.map((key, value) -> {...}
.groupByKey()
.windowedBy(TimeWindows.of(5000))
.aggregate(Initializer, Aggregator, Materialized) // disable caching via Materialized
.toStream()
.foreach(...) // react to every update to the KTable