从基于日期窗口的状态存储中获取非压缩键/值



拓扑定义:

KStream<String, JsonNode> transactions = builder.stream(inputTopic, Consumed.with(Serdes.String(), jsonSerde));

KTable<Windowed<String>, JsonNode> aggregation =
transactions
.groupByKey()
.windowedBy(
TimeWindows.of(Duration.ofSeconds(windowDuration)).grace(Duration.ofSeconds(windowGraceDuration)))
.aggregate(() -> new Service().buildInitialStats(),
(key, transaction, previous) -> new Service().build(key, transaction, previous),
Materialized.<String, JsonNode, WindowStore<Bytes, byte[]>>as(statStoreName).withRetention(Duration.ofSeconds((windowDuration + windowGraceDuration + windowRetentionDuration)))
.withKeySerde(Serdes.String())
.withValueSerde(jsonSerde)
.withCacheDisabled())
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
aggregation.toStream()
.to(outputTopic, Produced.with(windowedSerde, jsonSerde));

状态存储API:通过查找所有时间窗口获取密钥。

Instant timeFrom = Instant.ofEpochMilli(0);
Instant timeTo = Instant.now();
WindowStoreIterator<ObjectNode> value = store.fetch(key,timeFrom,timeTo);
while(value.hasNext()){
System.out.println(value.next());
}

作为测试的一部分,执行了2个事务,它产生了密钥1。我的要求是在查找状态存储时,在没有压缩的情况下获得两次密钥1(当前和以前(。Result总是返回带有键和最终聚合值的最终结果。

Txn1->Key-Key1|值-{计数=1,属性="测试"}

Txn2->Key-Key1|值-{计数=2,属性='test1'}

状态存储查找后的当前行为:始终获取值为{Count=2,attribute=‘test1’}的压缩密钥1

相反,我希望获得该窗口持续时间内的所有key1。

作为解决方案的一部分,我做了以下更改,但不幸的是,它没有起作用。

  1. 在拓扑级别禁用缓存
  2. cache.max.bytes.buffering到0
  3. 从内部变更日志主题手动删除压缩策略

可疑的变更日志主题被压缩,从而在调用statestoreapi时获得压缩的密钥。

需要进行哪些更改才能通过状态存储API获得非预期密钥?

如果要获得所有中间结果,则不应使用suppress()运算符。suppress()被设计为每个窗口发出一个结果记录,也就是说,它所做的与您想要的完全相反。

最新更新