Kafka stream aggregate



我有一个kafka流聚合的问题。

我想要的是,对于每个到达输入主题的输入数据,我们都有一个新版本的输出聚合KTable,它被生成,然后连接到第二个主题。

在现实中,我们没有1:1…所以我们没有做足够的连接第二个主题,我们错过了处理。

我确信问题出在聚合上,因为我在一个主题中编写了聚合的输出,我把一个消费者放在主题上:我确实观察到我没有足够的KTable版本正在生成。

我们发现了一些改进的设置:通过使用Kafka流配置的COMMIT_INTERVAL_MS_CONFIG和CACHE_MAX_BYTES_BUFFERING_CONFIG参数,我们有一个更好的处理速率。

使用这些参数是使聚合方法系统地产生一个聚合KTable版本的正确解决方案吗?如果是,应该设置什么值?

提前感谢您的回答。

下面是聚合和连接的代码:
KGroupedStream<String, GenericRecord> groupedEventStream = eventsSource.groupByKey();
KStream<String, String> resultStream =
groupedEventStream.aggregate(this::initSensorAggregatedRecord, this::updateSensorAggregatedRecord).leftJoin(secondSource,
this::bindSecondSource).toStream();

这是我们在kafka流配置上设置的设置:

props.put(COMMIT_INTERVAL_MS_CONFIG, 0);
props.put(CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

的问候CG

如果您想为每个传入记录获得一个新窗口,您应该使用滑动窗口滑动窗口。它们不完全是你想要的,但你可以调整窗口时间,你可能会让它为你工作。

相关内容

  • 没有找到相关文章

最新更新