使用Kafka流和窗口化流生成统计事件,但如果有最近的窗口化事件,则跳过一些窗口化事件



对于过去24小时的事件,我希望生成一个统计事件,以便在某个地方实时显示。因此,当前的行为是:

  1. 每分钟,我聚合最后24小时的事件以添加列表对象
  2. 我临时计算完整列表并生成最终统计对象
  3. 我把这个最后的统计对象推到一个新的话题上

我正在使用Kafka Stream和Spring Boot。

它运行得很好,我有很好的计算能力,该事件在开发中制作得很好。问题是当我在生产中时,源事件主题包含太多数据。

如果我的应用程序停止一天或几分钟。当应用程序重新启动时,我的应用程序会尝试恢复历史记录。卡夫卡流从最后一次偏移开始继续处理,需要花费大量时间来弥补他的延迟。事实上,我不在乎历史。我不需要昨天或最后24小时减去1小时的统计对象,我只想从现在到最后24小时重新计算,就这样。

如果应用程序运行正常,但处理统计事件时会有一些延迟,则也是如此。滞后不断增加。如果延迟变得太重要,我会Kafka Stream自动跳过时间窗口,只计算最后一个。

你认为卡夫卡流能做到吗?提前谢谢。

/**
* Every minute, we collect all events on the last day and we publish a new statistic event.
* 
* @param streamsBuilder
* @return
*/
@Bean
public KStream<String, MySourceEvent> kstreamMySourceEventStatistique(final StreamsBuilder streamsBuilder) {
// We create the stream to consume machine-state topic.
KStream<String, MySourceEvent> kstreamStat = streamsBuilder
.<String, MySourceEvent>stream("my-source-topic", Consumed
.with(Serdes.String(), KafkaUtils
.jsonSerdeForClass(MySourceEvent.class)));
// For this stream, every minute, we take all events in the last 24h, and we aggregate them into TemporaryStatistiqueEvent
KTable<Windowed<String>, TemporaryStatistiqueEvent> aggregatedStream = kstreamStat 
.groupByKey(Grouped
.with(Serdes.String(), KafkaUtils
.jsonSerdeForClass(MySourceEvent.class)))
.windowedBy(TimeWindows
.of(Duration.ofDays(1))
.advanceBy(Duration.ofMinutes(1))
.grace(Duration.ofSeconds(0)))
.<TemporaryStatistiqueEvent>aggregate(() -> new TemporaryStatistiqueEvent(), (key, value, logAgg) -> {
logAgg.add(value); //I add the event in my TemporaryStatistiqueEvent object
return logAgg;
}, Materialized
.<String, TemporaryStatistiqueEvent, WindowStore<Bytes, byte[]>>as("temporary-stats-store")
.withKeySerde(Serdes.String())
.withValueSerde(KafkaUtils.jsonSerdeForClass(TemporaryStatistiqueEvent.class))
.withRetention(Duration.ofDays(1)))
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()));
// Now, we gave an aggregate on last 24h, we compute the statistic and push FinalStatisticEvent object in a new topic
aggregatedStream
.toStream()
.map(new KeyValueMapper<Windowed<String>, TemporaryStatistiqueEvent, KeyValue<String, PlcStatMachineState>>() {
@Override
public KeyValue<String, FinalStatisticEvent> apply(final Windowed<String> key, final TemporaryStatistiqueEvent temporaryStatistiqueEvent) {
ZonedDateTime zdt = ZonedDateTime.ofInstant(Instant.ofEpochMilli(key.window().end()), ZoneOffset.UTC);
return new KeyValue<>(key.key(), temporaryStatistiqueEvent.computeFinalStatisticEvent(zdt));
}
})
.to("final-stat-topic", Produced.with(Serdes.String(), KafkaUtils.jsonSerdeForClass(FinalStatisticEvent .class)));
return kstreamStat;
}

这是一个棘手的问题。。。

对于离线和重新启动的情况,您可以尝试在重新启动应用程序之前操作启动偏移量(即commmitted offset(。使用CCD_ 1;按时间搜索";因此";向前跳到现在-分钟-24小时";。

对于应用程序滞后的情况,则更为复杂。也许你可以有一个";"动态滤波器";(要访问像时间戳这样的记录元数据,您可以使用flatTransformValues来实现过滤器(作为程序的第一步,这会删除太旧的记录吗?

相关内容

  • 没有找到相关文章

最新更新