如何对 Kafka 流进行单元测试



在探索如何单元测试Kafka流时,我遇到了ProcessorTopologyTestDriver,不幸的是,这个类似乎在版本0.10.1.0(KAFKA-4408(中被破坏

是否有针对 KTable 问题的解决方法?

我看到了"Mocked Streams"项目,但首先它使用版本0.10.2.0,而我在0.10.1.1,其次它是Scala,而我的测试是Java/Groovy。

这里关于如何在不必引导动物园管理员/kafka 的情况下对流进行单元测试的任何帮助都会很棒。

注意:我确实有使用嵌入式服务器的集成测试,这是用于单元测试,也就是快速、简单的测试。

编辑

谢谢拉蒙·加西亚

对于在Google搜索中到达这里的人,请注意,测试驱动程序类现在是org.apache.kafka.streams.TopologyTestDriver

这个类在 maven package groupId org.apache.kafka, artifactId kafka-streams-test-utils 中

我找到了解决这个问题的方法,我不确定这是答案,尤其是在 https://stackoverflow.com/users/4953079/matthias-j-sax 评论之后。无论如何,分享我到目前为止所拥有的...

我完全复制了 0.10.1 分支中的ProcessorTopologyTestDriver(这是我正在使用的版本(。

为了解决 KAFKA-4408 问题,我使private final MockConsumer<byte[], byte[]> restoreStateConsumer可访问并将块task = new StreamTask(...移动到单独的方法,例如 bootstrap .

在测试的设置阶段,我执行以下操作

driver = new ProcessorTopologyTestDriver(config, builder)
ArrayList partitionInfos = new ArrayList();
partitionInfos.add(new PartitionInfo('my_ktable', 1, (Node) null, (Node[]) null, (Node[]) null));
driver.restoreStateConsumer.updatePartitions('my_ktable', partitionInfos);
driver.restoreStateConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition('my_ktable', 1), Long.valueOf(0L)));
driver.bootstrap()

就是这样...

奖金

我还遇到了 KAFKA-4461,幸运的是,由于我复制了整个类,我能够通过细微的调整"挑选"接受的修复程序。

一如既往地感谢反馈。虽然显然不是官方测试类,但事实证明该驱动程序非常有用!

对于在Google搜索中到达这里的人,请注意,测试驱动程序类现在是org.apache.kafka.streams.TopologyTestDriver

这个类在 maven package groupId org.apache.kafka, artifactId kafka-streams-test-utils 中

相关内容

  • 没有找到相关文章

最新更新