我有一个关于带有Kafka Streams的TimeWindows的问题,有些概念真的让我困惑。
我们有一个主题,每天有1000万个事件,我们有6天的日志保留期,所以总的主题包含6000万个事件。
事实上,只有当前几天的事件对我们来说是有趣的,其余的我们保留了5天,只是出于审计的原因。
现在我从中创建了一个KTable,我正在进行一个load-all操作并迭代事件。正如我之前提到的,实际上我们只关心当天的事件,而不是6000万个事件,所以我在KTable定义中打开了数据窗口。
.windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(1)).until(TimeUnit.DAYS.toMillis(1))
现在,当我用下面的语句加载所有事件时,一切都很好。
store().fetchAll(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1), System.currentTimeMillis())
这个问题是,在一天的早些时候,这将加载let 100万个事件,但稍后会加载1000万个,所以我必须迭代超过1000万个事件。当我们在批处理模式下工作时,我认为我可以进一步优化这一点,并且只加载最后一个小时的事件,所以对于相同的KTable配置,我尝试使用以下语句。
store().fetchAll(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1), System.currentTimeMillis())
但令我惊讶的是,这并没有返回任何数据。
有人能解释一下为什么这没有返回任何结果吗,我想我误解了TimeWindow概念中的一些内容。
然后我做了一些进一步的测试,并将KTable配置更改为如下。
.windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1)).until(TimeUnit.DAYS.toMillis(1)))
现在这个查询的功能就像我想要的一样
store().fetchAll(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1), System.currentTimeMillis())
但我不确定我走的是正确的道路。。。
如果我将使用以下语句进行最新的KTable配置,这会从当天起为我提供1000万个事件吗?
store().fetchAll(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1), System.currentTimeMillis())
在窗口存储上使用交互式查询时,时间范围应用于窗口开始时间戳。因此,如果您有一个1天的窗口,并从[now - 1 hour, now)
查询具有窗口开始时间戳的数据,您将找不到任何匹配的窗口,因为在此时间范围内没有窗口开始。