我们有一个可以在多台机器上运行的流拓扑。我们将时间窗口聚合结果存储到状态存储中。 由于状态存储存储本地数据,因此我认为应该在另一个主题上进行聚合以进行整体聚合。 但似乎我错过了一些东西,因为没有一个示例在另一个 KStream 或处理器上进行整体聚合。
我们是否需要使用 groupBy 逻辑来存储整体聚合,或者使用 GlobalKtable 或者只是在某个地方实现我们自己的合并代码?
正确的架构是什么?
在下面的代码中,我试图使用常量键将所有进入处理器的消息分组,以将整体聚合存储在一台机器上,但我认为它会失去 Kafka 提供的并行性。
dashboardItemProcessor = streamsBuilder.stream("Topic25", Consumed.with(Serdes.String(), eventSerde))
.filter((key, event) -> event != null && event.getClientCreationDate() != null);
dashboardItemProcessor.map((key, event) -> KeyValue.pair(key, event.getClientCreationDate().toInstant().toEpochMilli()))
.groupBy((key, event) -> "count", Serialized.with(Serdes.String(), Serdes.Long()))
.windowedBy(timeWindow)
.count(Materialized.as(dashboardItemUtil.getStoreName(itemId, timeWindow)));
在下面的代码中,我尝试使用常量键将所有进入处理器的消息分组,以将整体聚合存储在一台机器上,但我认为它会失去 Kafka 提供的并行性。
这似乎是正确的方法。是的,你失去了并行性,但这就是全局聚合的工作方式。最后,一台机器必须计算它...
不过,您可以改进的是采取两步方法:即,首先通过并行的"随机"键聚合,并使用仅具有一个键的第二步将部分聚合"合并"为单个聚合。这样,计算的某些部分是并行的,只有最后一步(希望减少数据负载(是非并行的。使用 Kafka Streams,您需要"手动"实现此方法。