我使用kafka流组件在30分钟的滑动窗口上构建聚合(sum(,宽限期为2分钟。我正在按10000个时间序列(组(的顺序进行处理。聚合使用禁用日志记录的持久状态存储。为了在聚合间隔结束时仅输出最终结果,我使用Suppressed运算符。
大多数时间序列的聚合计算都是正确的,但也有一小部分是不正确的。在这些情况下,聚合值在许多情况下正好反映来自输入流的单个记录值。最初,我使用的聚合启用了默认记录缓存(cache.max.bytes.buffering
为10MB(,并启用了Suppressed withNoBound((选项,以根据需要不断分配更多内存。
基于以下帖子建议:
抑制缓冲区内存独立于Streams的记录缓存,因此确保您有足够的堆来承载记录缓存(cache.max.bytes.braining(抑制缓冲区大小
我将记录缓存大小(cache.max.bytes.buffering
(从10MB(默认值(增加到100MB,结果的准确性得到了显著提高。然而,我仍然不时发现一些群体的总和计算错误的情况。
我的聚合管道:
@StreamListener
@SendTo("output-aggregated")
public KStream<String, Aggregate> aggregatePipeline(
@Input("input-event") KStream<String, Event> inputEventKStream) {
Duration windowDuration = Duration.ofMinutes(30);
Duration retentionPeriod = windowDuration;
Duration advanceDuration = Duration.ofMinutes(1);
Duration graceDuration = Duration.ofMinutes(2);
// custom state store
WindowBytesStoreSupplier timestampedWindowStore = Stores.persistentTimestampedWindowStore("aggregate-30m",
retentionPeriod, windowDuration, true);
Materialized<String, Aggregate, WindowStore<Bytes,byte[]>> materializedCustomStore = Materialized.as(timestampedWindowStore);
materializedCustomStore.withKeySerde(Serdes.String()).withLoggingDisabled();
// stream processing
TimeWindowedKStream<String, Event> timeWindowedKStream = inputEventKStream
.filter((key, value) -> isEventMatchingAggregationInterval(value, windowDuration))
.groupBy((key, value) -> Utils.toTimeserieName(value.getSource(), value.getDimensions()))
.windowedBy(TimeWindows.of(windowDuration).advanceBy(advanceDuration).grace(graceDuration));
KTable<Windowed<String>, Aggregate> aggregatedKTable = timeWindowedKStream
.aggregate(
() -> new Aggregate(),
(key, newValue, aggregate) -> {
aggregate.setSum(newValue.getValue() + aggregate.getSum());
aggregate.setCount(aggregate.getCount() + 1);
return aggregate;
},
materializedCustomStore);
return aggregatedKTable
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded().withNoBound()).withName("suppressed-30m"))
.toStream()
.map((key, value) -> new KeyValue<String, Aggregate>(key.key(), value))
;
}
我的环境:Kafka流:2.5.0,Spring Cloud Hoxton.SR8,Spring Boot 2.3.2
我的问题:
如何正确调整记录缓存的大小以确保正确计算所有聚合?
为什么缓存大小会影响最终结果的计算?由于Suppressed是聚合的下一个下游运算符,因此Suppressd运算符不会看到给定窗口的所有中间结果吗?假设缓存已满,这些结果会被刷新?
来自文档:
缓存的语义是,只要commit.interval.ms或cache.max.bytes.buffering(缓存压力(中最早命中,数据就会被刷新到状态存储,并转发到下一个下游处理器节点。commit.interval.ms和cache.max.bytes.buffering都是全局参数
当缓存大小被充分利用并触发刷新时,是否有触发某些日志的选项?
您对跟踪记录缓存和抑制缓冲区利用率的最佳指标有什么建议吗?除了";kafka_stream_state_suppression_buffer_size_max"kafka_stream_record_cache_hit_ratio_max";?
非常感谢!
-
上述方法的问题出现在以下代码行中:
WindowBytesStoreSupplier timestampedWindowStore = Stores.persistentTimestampedWindowStore("aggregate-30m", retentionPeriod, windowDuration, **true**);
标记为true的标志对应于状态存储的retainDuplicates配置。由于这是真的,所以存储了相同密钥条目的重复条目。缓冲区记录缓存一满,中间聚合结果就被推送到存储中。由于保留了相同密钥的多个中间结果,因此无法正确计算相同密钥的后续聚合操作(即,为同一密钥存储了多个中间成果(。
也就是说,禁用retainDuplicate选项可以解决这个问题,即使记录缓存大小为零也是如此。聚合的最终结果不会受到记录缓存大小的影响。
WindowBytesStoreSupplier timestampedWindowStore = Stores.persistentTimestampedWindowStore("aggregate-30m",
retentionPeriod, windowDuration, false);
问题不在于Suppressed运算符,而在于Aggregate运算符使用的状态存储配置错误。
包org.apache.kafka.streams.processor.internals的调试级别日志提供提交信息。