我的问题类似于:如何对使用会话窗口的 kafka 流应用程序进行单元测试
拓扑看起来像
.filter()
.groupByKey()
.windowedBy(SessionWindows.with(30).grace(5))
.count()
.toStream()
.selectKey((k, v)->k.key())
.to(outTopic)
当我运行此应用程序并发送如下数据时:
key1, {somejson}
key1, {somejson}
key1, {somejson}
在输出主题中,我按预期在 30 秒后正确看到记录
key1, 3
当我为相同的单元测试编写时(在阅读了有关 advancedWallClockTime 的另一个问题后,我的测试代码如下所示:
final Instant now = Instant.now();
// Send messages with one second difference timestamps
testDriver.pipeInput(consumerRecordFactory.create(inputTopicName, "key1", json, now.toEpochMilli()));
testDriver.pipeInput(consumerRecordFactory.create(inputTopicName, "key1", json, now.plusMillis(1000L).toEpochMilli()));
testDriver.pipeInput(consumerRecordFactory.create(inputTopicName, "key1", json, now.plusMillis(2000L).toEpochMilli()));
testDriver.advanceWallClockTime(35000L)
然后我尝试比较结果
ProducerRecord<String, Long> life = testDriver.readOutput(outputTopicName, stringSerde.deserializer(), longSerde.deserializer());
Assert.assertEquals(lifevalue, Long.valueOf(3));
我希望它是 3,但似乎它总是 1。但是如果我写这样的东西:
List<ProducerRecord<String, Long>> expectedList = Arrays.asList(
new ProducerRecord<String, Long>(outputTopicName, "key1", 1L),
new ProducerRecord<String, Long>(outputTopicName, "key1", 2L),
new ProducerRecord<String, Long>(outputTopicName, "key1", 3L)
);
for (ProducerRecord<String, Long> expected : expectedList) {
ProducerRecord<String, Long> actual = testDriver.readOutput(outputTopicName, stringSerde.deserializer(), longSerde.deserializer());
Assert.assertEquals(expected.value(), actual.value());
}
然后我的测试通过了。
我做错了什么?最终,我想为两个不同的键添加数据,并测试它们是否都带有计数:3L。
您在测试方面看到的区别在于TopologyTestDriver
的工作方式。 首先解释一下 Kafka Streams 如何处理某些上下文的有状态操作可能会有所帮助。
运行 Kafka Streams 应用程序时,来自有状态操作的"真实"记录由内部缓存缓冲。 Kafka 流在满足以下两个条件之一时刷新内部缓存:
提交- 记录(默认提交间隔为 30 秒(
- 缓存已满。
根据您上面描述的内容,您可以在流提交消耗的偏移量后观察到 3 的计数。 缓存中的前两条记录被替换,仅发出最后一个计数 3。
现在有了TopologyTestDriver
,没有内部缓存;测试驱动程序转发每条记录。因此,您必须为提交的每条记录致电testDriver.readOutput
。
所以你上面的行
ProducerRecord<String, Long> life = testDriver.readOutput(outputTopicName, stringSerde.deserializer(), longSerde.deserializer());
发出您通过testDriver.pipeInput
提供的第一条记录。因为你只给testDriver.readOutput
打电话一次。
您将在第二个代码示例中注意到:
for (ProducerRecord<String, Long> expected : expectedList) {
ProducerRecord<String, Long> actual = testDriver.readOutput(outputTopicName, stringSerde.deserializer(), longSerde.deserializer());
Assert.assertEquals(expected.value(), actual.value());
}
您获得预期的结果是因为您执行testDriver.readOutput
次数与输入测试记录的次数相同。
呵呵,
法案