如何使用卡夫卡时间窗口进行历史聚合



我需要创建每天经过身份验证的用户数的状态存储,这样我就可以获得最后一天、最后7天和最后30天的经过身份验证用户数。为了实现这一点,每个身份验证事件都被发送到身份验证事件主题。我正在流式传输这个主题,并为每天创建窗口。代码:

KStream<String, GenericRecord> authStream = builder.stream("auth-event", Consumed.with(stringSerde, valueSerde)
.withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST)
.withTimestampExtractor(new TransactionTimestampExtractor()));
authStream 
.groupBy(( String key, GenericRecord value) -> value.get("tenantId").toString(), Grouped.with(Serdes.String(), valueSerde))
.windowedBy(TimeWindows.of(Duration.ofDays(1)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("auth-result-store")
.withKeySerde(stringSerde)
.withValueSerde(longSerde))
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()
.to("auth-result-topic", Produced.with(timeWindowedSerdeFrom(String.class), Serdes.Long()));

之后,我将插入有关该主题的记录。此外,我有休息控制器,我正在使用ReadOnlyWindowStore读取存储。day参数是从UI发送的,可以是1天、7天或30天。这意味着我想阅读最后7个窗口。代码:

final ReadOnlyWindowStore<String, Long> dayStore = kafkaStreams.store(KStreamsLdapsExample.authResultTable, QueryableStoreTypes.windowStore());
Instant timeFrom = (Instant.now().minus(Duration.ofDays(days)));
LocalDate currentDate = LocalDate.now();
LocalDateTime currentDayTime = currentDate.atTime(23, 59, 59);
Instant timeTo = Instant.ofEpochSecond(currentDayTime.toEpochSecond(ZoneOffset.UTC));
try(WindowStoreIterator<Long> it1 = dayStore.fetch(tenant, timeFrom, timeTo)) {
Long count = 0L;
JsonObject jsonObject = new JsonObject();
while (it1.hasNext())
{
final KeyValue<Long, Long> next = it1.next();
Date resultDate = new Date(next.key);
jsonObject.addProperty(resultDate.toString(), next.value);
count += next.value;
}
jsonObject.addProperty("tenant", tenant);
jsonObject.addProperty("Total number of events", count);
return ResponseEntity.ok(jsonObject.toString());
}

问题是,我只能在1-2天内得到结果。在那之后,旧窗户就不见了。另一个问题是在输出主题中写入的信息:"auth-result-topic"我正在阅读控制台消费者的结果,有很多空记录,没有键,没有值,还有一些带有随机数的记录。在此处输入图像描述

知道我的店怎么了吗?如何阅读过去的N个窗口?感谢

您需要通过Materialize.as(...).withRetention(...)增加存储保留时间(默认为1天(,您可以将其传递给count()运算符。

您可能还想通过TimeWindows.of(Duration.ofDays(1)).grace(...)增加窗口宽限期。

为了使用控制台使用者读取数据:您需要指定正确的反序列化程序。用于写入输出主题的窗口序列号和长序列号使用二进制格式,而控制台使用者默认采用字符串数据类型。您可以指定相应的命令行参数来设置不同的键和值反序列化程序,这些反序列化程序必须与写入主题时使用的序列化程序相匹配。

相关内容

  • 没有找到相关文章

最新更新