使用 groupByKey/windowedBy/count 对 Kafka 流进行单元测试



我的问题类似于:如何对使用会话窗口的 kafka 流应用程序进行单元测试

拓扑看起来像

.filter()
.groupByKey()
.windowedBy(SessionWindows.with(30).grace(5))
.count()
.toStream()
.selectKey((k, v)->k.key())
.to(outTopic)

当我运行此应用程序并发送如下数据时:

key1, {somejson}
key1, {somejson}
key1, {somejson}

在输出主题中,我按预期在 30 秒后正确看到记录

key1, 3

当我为相同的单元测试编写时(在阅读了有关 advancedWallClockTime 的另一个问题后,我的测试代码如下所示:

final Instant now = Instant.now();
// Send messages with one second difference timestamps
testDriver.pipeInput(consumerRecordFactory.create(inputTopicName, "key1", json, now.toEpochMilli()));
testDriver.pipeInput(consumerRecordFactory.create(inputTopicName, "key1", json, now.plusMillis(1000L).toEpochMilli()));
testDriver.pipeInput(consumerRecordFactory.create(inputTopicName, "key1", json, now.plusMillis(2000L).toEpochMilli()));
testDriver.advanceWallClockTime(35000L)

然后我尝试比较结果

ProducerRecord<String, Long> life = testDriver.readOutput(outputTopicName, stringSerde.deserializer(), longSerde.deserializer());
Assert.assertEquals(lifevalue, Long.valueOf(3));

我希望它是 3,但似乎它总是 1。但是如果我写这样的东西:

List<ProducerRecord<String, Long>> expectedList = Arrays.asList(
new ProducerRecord<String, Long>(outputTopicName, "key1", 1L),
new ProducerRecord<String, Long>(outputTopicName, "key1", 2L),
new ProducerRecord<String, Long>(outputTopicName, "key1", 3L)
);
for (ProducerRecord<String, Long> expected : expectedList) {
ProducerRecord<String, Long> actual = testDriver.readOutput(outputTopicName, stringSerde.deserializer(), longSerde.deserializer());
Assert.assertEquals(expected.value(),  actual.value());
}

然后我的测试通过了。

我做错了什么?最终,我想为两个不同的键添加数据,并测试它们是否都带有计数:3L。

您在测试方面看到的区别在于TopologyTestDriver的工作方式。 首先解释一下 Kafka Streams 如何处理某些上下文的有状态操作可能会有所帮助。

运行 Kafka Streams 应用程序时,来自有状态操作的"真实"记录由内部缓存缓冲。 Kafka 流在满足以下两个条件之一时刷新内部缓存:

提交
  1. 记录(默认提交间隔为 30 秒(
  2. 缓存已满。

根据您上面描述的内容,您可以在流提交消耗的偏移量后观察到 3 的计数。 缓存中的前两条记录被替换,仅发出最后一个计数 3。

现在有了TopologyTestDriver,没有内部缓存;测试驱动程序转发每条记录。因此,您必须为提交的每条记录致电testDriver.readOutput

所以你上面的行

ProducerRecord<String, Long> life = testDriver.readOutput(outputTopicName, stringSerde.deserializer(), longSerde.deserializer());

发出您通过testDriver.pipeInput提供的第一条记录。因为你只给testDriver.readOutput打电话一次。

您将在第二个代码示例中注意到:

for (ProducerRecord<String, Long> expected : expectedList) {
ProducerRecord<String, Long> actual = testDriver.readOutput(outputTopicName, stringSerde.deserializer(), longSerde.deserializer());
Assert.assertEquals(expected.value(),  actual.value());
}

您获得预期的结果是因为您执行testDriver.readOutput次数与输入测试记录的次数相同。

呵呵,

法案

最新更新