KStream窗口聚合



尝试合并多个Kafka流,聚合&产生一个新的话题。然而,在同一窗口中,代码生成的聚合记录与每个输入流中的总输入记录一样多。我希望聚合在联接窗口结束时只产生1个输出。我在下面的代码中做错了什么-

val streams = requestStreams.merge(successStreams).merge(errorStreams)
.groupByKey(Grouped.with(Serdes.String(), serdesConfig.notificationSerde()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(10)))
.aggregate({ null }, StreamAggregators.notificationMetricAggregator, Materialized.`as`<String, NotificationMetric, WindowStore<Bytes, ByteArray>>("ag-store")
.withValueSerde(serdesConfig.notificationMetricSerde()))
.toStream()
streams.to(notificationStreamsConfig.metricsTopic, Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String::class.java, 10), serdesConfig.notificationMetricSerde()))

Kafka Streams默认使用连续更新处理模型。注意,聚合的结果是KTable。该结果表包含每个窗口的一行,每次处理新记录时,窗口(即表中的行(都会更新。

如果调用KTable#toStream(),则会得到表的变更日志流,该流包含表每次更新的记录。

如果每个窗口只想获得一个结果,可以使用suppress()运算符获得第二个KTable,即suppress()获取第一个KTable的变更日志流,并等待直到窗口关闭,然后只将最终结果插入其输出KTable。如果使用suppress(),则应将上游窗口聚合的宽限期(默认为24小时(设置为较低的值,即TimeWindows.of(...).grace(...)

有关更多详细信息,请查看此博客文章:https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers

相关内容

  • 没有找到相关文章

最新更新