尝试合并多个Kafka流,聚合&产生一个新的话题。然而,在同一窗口中,代码生成的聚合记录与每个输入流中的总输入记录一样多。我希望聚合在联接窗口结束时只产生1个输出。我在下面的代码中做错了什么-
val streams = requestStreams.merge(successStreams).merge(errorStreams)
.groupByKey(Grouped.with(Serdes.String(), serdesConfig.notificationSerde()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(10)))
.aggregate({ null }, StreamAggregators.notificationMetricAggregator, Materialized.`as`<String, NotificationMetric, WindowStore<Bytes, ByteArray>>("ag-store")
.withValueSerde(serdesConfig.notificationMetricSerde()))
.toStream()
streams.to(notificationStreamsConfig.metricsTopic, Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String::class.java, 10), serdesConfig.notificationMetricSerde()))
Kafka Streams默认使用连续更新处理模型。注意,聚合的结果是KTable
。该结果表包含每个窗口的一行,每次处理新记录时,窗口(即表中的行(都会更新。
如果调用KTable#toStream()
,则会得到表的变更日志流,该流包含表每次更新的记录。
如果每个窗口只想获得一个结果,可以使用suppress()
运算符获得第二个KTable
,即suppress()
获取第一个KTable
的变更日志流,并等待直到窗口关闭,然后只将最终结果插入其输出KTable
。如果使用suppress()
,则应将上游窗口聚合的宽限期(默认为24小时(设置为较低的值,即TimeWindows.of(...).grace(...)
。
有关更多详细信息,请查看此博客文章:https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers