我正在使用streamsbuilder的简单API来构建一个可构建的globalkttable:
Materialized<Long, Category, KeyValueStore<Bytes, byte[]>> materialized =
Materialized.<Long, Category, KeyValueStore<Bytes, byte[]>>as(this.categoryStoreName)
.withCachingDisabled()
.withKeySerde(Serdes.Long())
.withValueSerde(CATEGORY_JSON_SERDE);
return streamsBuilder.globalTable(categoryTopic, materialized);
我想通过更改通知。它很少更新,如果我想触发缓存无效的更新。卡夫卡的做法是什么?
globalktable不支持这一点。但是,您可以使用"全局商店"并实现每个更新的自定义Processor
。
在内部,GlobalKTable
使用"全局商店"并为您提供Processor
实现。
您可以通过StreamsBuilder#addGlobalStore()
添加全局商店。