这是问题所在:
假设有一个数字流,我想在 1 小时存储桶上收集这些数字的 MAX,其中我允许给定存储桶最多延迟 3 小时。
这听起来像是翻转窗户的实验室案例。
这是我到目前为止所拥有的:
stream.aggregate(
() -> 0L,
(aggKey, value, aggregate) -> Math.max(value, aggregate),
TimeWindows.of(TimeUnit.HOURS.toMillis(1L)).until(TimeUnit.HOURS.toMillis(3L)),
Serdes.Long(),
"my_store"
)
首先,我无法验证这是否确实发生在测试中。时间戳是通过 TimestampExtractor 提取的,我用Thread.sleep
模拟延迟(我将窗口设置为较小的值进行测试),但"延迟记录"仍然被处理而不是丢弃。
在常规窗口上似乎很少(没有?)示例。有一个关于SessionWindows的集成测试,但仅此而已。我是否正确理解了这些概念?
编辑 2
示例 JUnit 测试。由于它相当大,我通过 Gist 分享它。
https://gist.github.com/Hartimer/6018a731753846c1930429716703e5a6
编辑(添加更多代码)
数据点具有时间戳(收集数据的时间)、收集数据的计算机的主机名和值。
{
"collectedAt": 12314124134, // timestamp
"hostname": "machine-1",
"reading": 3
}
自定义时间戳提取器用于获取collectedAt
。这是我的管道的更完整表示形式:
source.map(this::fixKey) // Associates record with a key like "<timestamp>:<hostname>"
.groupByKey(Serdes.String(), roundDataSerde)
.aggregate(
() -> RoundData.EMPTY_ROUND,
(aggKey, value, aggregate) -> max(value, aggregate),
TimeWindows.of(TimeUnit.HOURS.toMillis(1L))
.until(TimeUnit.SECONDS.toMillis(1L)), // For testing I allow 1 second delay
roundDataSerde,
"entries_store"
)
.toStream()
.map(this::simpleRoundDataToAggregate) // Associates record with a key like "<timestamp floored to nearest hour>"
.groupByKey(aggregateSerde, aggregateSerde)
.aggregate(
() -> MyAggregate.EMPTY,
(aggKey, value, aggregate) -> aggregate.merge(value), // I know this is not idempotent, that's a WIP
TimeWindows.of(TimeUnit.HOURS.toMillis(1L))
.until(TimeUnit.SECONDS.toMillis(1L)), // For testing I allow 1 second delay
aggregateSerde,
"result_store"
)
.print()
测试的一个片段是
Instant roundId = Instant.now().truncatedTo(ChronoUnit.HOURS).minus(9L, ChronoUnit.HOURS);
sendRecord("mytopic", roundId, 3);
sendRecord("mytopic", roundId.plusMillis(15000), 2);
log.info("Waiting a little before sending more usage. (simulating late record)");
Thread.sleep(5000L);
sendRecord("mytopic", roundId.plusMillis(30000), 5);
// Assert stored value is "3".
// It actually is 5 because the last round is accounted for
任何帮助将不胜感激。
我认为Hartimer的自我回答实际上是不正确的。让我试着解释一下发生了什么,至少据我自己所知。:-)
- 延迟到达的数据是根据通过配置的时间戳提取器为应用程序配置的时间语义进行处理的。 在@Hartimer的情况下,这是事件时间(此处使用自定义时间戳提取器)。
- FWIW,在处理时间的情况下,根据定义,没有延迟到达的记录:每条记录都"及时"到达。 "迟到"记录(同样,在此上下文中没有此类记录)包含在当前窗口中,但永远不会重新拟合到较早的窗口中。
- 设置窗口保留时间的调用
TimeWindows#until()
是保留时间的下限。 Kafka 可能会在窗口周围保持比配置的保留时间"更长一点"(我在这里故意模糊,见下文)。 出于这个原因,像@Hartimer这样的严格测试可能不会产生人们直觉期望的结果。
关于窗口保留时间是下限的幕后实际发生的事情有点棘手(可能超出了这个问题的范围),所以我推迟尝试解释这一点,除非有特定的要求我这样做。
更新:此外,问题片段中的这段代码甚至不应该工作,因为它应该抛出一个IllegalArgumentException
:
TimeWindows.of(TimeUnit.HOURS.toMillis(1L))
.until(TimeUnit.SECONDS.toMillis(1L))
要求是,对于它们各自的输入参数,until() >= of()
. 不允许定义大小为 1 小时但保留期仅为 1 秒的窗口(此处的保留期必须为>= 1 小时)。
更新 2:幕后发生的事情是,TimeWindows#until()
的设置用于创建/管理本地窗口存储的段文件。 只要窗口段存在,就会接受该窗口的延迟到达记录。 我将跳过有关如何删除/过期段的部分,因为我真的需要深入研究代码(我不知道
我相信我发现了自己的问题。它归结为TimestampExtractor
以及我用来评估"延迟记录"的值。
在Kafka Stream术语中,有三个"时间"(见这里):
- 事件时间:记录数据的时间
- 处理时间:流处理器接收数据的时间
- 引入时间:(与问题无关)
在我的示例中,我实际上是使用Event-time来确定某些内容是否延迟,但这并不代表延迟记录。收集数据的人都会将此值设置为他们对时间的本地感知(至少在我的用例中)。
重要日期是处理时间。无论事件是何时生成的,我们都需要多长时间才能收到事件。我的聚合已经按"事件时间"处理分组。
我创建了一个新的 Gist,其中包含现在通过的测试的更新版本。添加了额外的字段receivedAt
,模拟"处理时间"。
https://gist.github.com/Hartimer/c79569ad517ab95d08dbe8e84bfa6789