Suppressed的Kafka Streams聚合:缓存大小对聚合结果的影响



我使用kafka流组件在30分钟的滑动窗口上构建聚合(sum(,宽限期为2分钟。我正在按10000个时间序列(组(的顺序进行处理。聚合使用禁用日志记录的持久状态存储。为了在聚合间隔结束时仅输出最终结果,我使用Suppressed运算符。

大多数时间序列的聚合计算都是正确的,但也有一小部分是不正确的。在这些情况下,聚合值在许多情况下正好反映来自输入流的单个记录值。最初,我使用的聚合启用了默认记录缓存(cache.max.bytes.buffering为10MB(,并启用了Suppressed withNoBound((选项,以根据需要不断分配更多内存。

基于以下帖子建议:

抑制缓冲区内存独立于Streams的记录缓存,因此确保您有足够的堆来承载记录缓存(cache.max.bytes.braining(抑制缓冲区大小

我将记录缓存大小(cache.max.bytes.buffering(从10MB(默认值(增加到100MB,结果的准确性得到了显著提高。然而,我仍然不时发现一些群体的总和计算错误的情况。

我的聚合管道:

@StreamListener
@SendTo("output-aggregated")
public KStream<String, Aggregate> aggregatePipeline(
@Input("input-event") KStream<String, Event> inputEventKStream) {
Duration windowDuration = Duration.ofMinutes(30);
Duration retentionPeriod = windowDuration;
Duration advanceDuration = Duration.ofMinutes(1);
Duration graceDuration = Duration.ofMinutes(2);
// custom state store
WindowBytesStoreSupplier timestampedWindowStore = Stores.persistentTimestampedWindowStore("aggregate-30m",
retentionPeriod, windowDuration, true);
Materialized<String, Aggregate, WindowStore<Bytes,byte[]>> materializedCustomStore = Materialized.as(timestampedWindowStore);
materializedCustomStore.withKeySerde(Serdes.String()).withLoggingDisabled();
// stream processing
TimeWindowedKStream<String, Event> timeWindowedKStream = inputEventKStream
.filter((key, value) -> isEventMatchingAggregationInterval(value, windowDuration))
.groupBy((key, value) -> Utils.toTimeserieName(value.getSource(), value.getDimensions()))
.windowedBy(TimeWindows.of(windowDuration).advanceBy(advanceDuration).grace(graceDuration));
KTable<Windowed<String>, Aggregate> aggregatedKTable = timeWindowedKStream
.aggregate(
() -> new Aggregate(),
(key, newValue, aggregate) -> {
aggregate.setSum(newValue.getValue() + aggregate.getSum());
aggregate.setCount(aggregate.getCount() + 1);
return aggregate;
},
materializedCustomStore);
return aggregatedKTable
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded().withNoBound()).withName("suppressed-30m"))
.toStream()
.map((key, value) -> new KeyValue<String, Aggregate>(key.key(), value))
;
}

我的环境:Kafka流:2.5.0,Spring Cloud Hoxton.SR8,Spring Boot 2.3.2

我的问题:

  1. 如何正确调整记录缓存的大小以确保正确计算所有聚合?

  2. 为什么缓存大小会影响最终结果的计算?由于Suppressed是聚合的下一个下游运算符,因此Suppressd运算符不会看到给定窗口的所有中间结果吗?假设缓存已满,这些结果会被刷新?

来自文档:

缓存的语义是,只要commit.interval.ms或cache.max.bytes.buffering(缓存压力(中最早命中,数据就会被刷新到状态存储,并转发到下一个下游处理器节点。commit.interval.ms和cache.max.bytes.buffering都是全局参数

  1. 当缓存大小被充分利用并触发刷新时,是否有触发某些日志的选项?

  2. 您对跟踪记录缓存和抑制缓冲区利用率的最佳指标有什么建议吗?除了";kafka_stream_state_suppression_buffer_size_max"kafka_stream_record_cache_hit_ratio_max";?

非常感谢!

  1. 上述方法的问题出现在以下代码行中:

    WindowBytesStoreSupplier timestampedWindowStore = Stores.persistentTimestampedWindowStore("aggregate-30m",
    retentionPeriod, windowDuration, **true**);
    

标记为true的标志对应于状态存储的retainDuplicates配置。由于这是真的,所以存储了相同密钥条目的重复条目。缓冲区记录缓存一满,中间聚合结果就被推送到存储中。由于保留了相同密钥的多个中间结果,因此无法正确计算相同密钥的后续聚合操作(即,为同一密钥存储了多个中间成果(。

也就是说,禁用retainDuplicate选项可以解决这个问题,即使记录缓存大小为零也是如此。聚合的最终结果不会受到记录缓存大小的影响。

WindowBytesStoreSupplier timestampedWindowStore = Stores.persistentTimestampedWindowStore("aggregate-30m",
retentionPeriod, windowDuration, false);
  1. 问题不在于Suppressed运算符,而在于Aggregate运算符使用的状态存储配置错误。

  2. 包org.apache.kafka.streams.processor.internals的调试级别日志提供提交信息。

最新更新