据我了解,更改窗口聚合的日志主题应至少包含每个窗口的一个密钥/值?
input
.groupByKey() // group by user
.windowedBy(
TimeWindows
.of(Duration.ofSeconds(60))
.advanceBy(Duration.ofSeconds(10))
.grace(Duration.ofSeconds(60)))
.aggregate(
() -> new Aggregate(config),
(userId, msg, aggregate) -> aggregate.addAndReturn(msg),
Materialized
.<String, Aggregate>as(inMemoryWindowStore(
config.getOutputStore(),
Duration.ofSeconds(300),
Duration.ofSeconds(60),
false))
.withCachingDisabled()
.withKeySerde(Serdes.String())
.withValueSerde(new MyCustomSerde()));
当我查询状态商店时,我希望每个窗口获得一个密钥/值:
WindowStoreIterator<Aggregate> iter = store.fetch(userId, start, end)
但是我要么什么都没有(迭代器为空(,或者有时比开始端之间的窗口数小。
您不正确地使用store.fetch(key, startTs, endTs)
的参数。两个时间戳startTs
和endTs
do 不请参考单个窗口的开始/结束时间戳,但它是a 时间范围: fetch()
将返回所有带有启动式启动的窗口,包括时间范围。
较旧版本中的Javadocs公平地公平,可能会错过领先。较新的版本改进了Javadocs:https://kafka.apache.org/23/javadoc/org/apache/kafka/kafka/kafka/state/state/readonlywindowstore.htore.html
请注意,参数类型是更改的,并以较新版本重命名:
WindowStoreIterator<V> fetch(K key, Instant from, Instant to) Get all the key-value pairs with the given key and the time range from all the existing windows. This iterator must be closed after use. The time range is inclusive and applies to the starting timestamp of the window.