在探索如何单元测试Kafka流时,我遇到了ProcessorTopologyTestDriver
,不幸的是,这个类似乎在版本0.10.1.0
(KAFKA-4408(中被破坏
是否有针对 KTable 问题的解决方法?
我看到了"Mocked Streams"项目,但首先它使用版本0.10.2.0
,而我在0.10.1.1
,其次它是Scala,而我的测试是Java/Groovy。
这里关于如何在不必引导动物园管理员/kafka 的情况下对流进行单元测试的任何帮助都会很棒。
注意:我确实有使用嵌入式服务器的集成测试,这是用于单元测试,也就是快速、简单的测试。
编辑
谢谢拉蒙·加西亚
对于在Google搜索中到达这里的人,请注意,测试驱动程序类现在是org.apache.kafka.streams.TopologyTestDriver
这个类在 maven package groupId org.apache.kafka, artifactId kafka-streams-test-utils 中
我找到了解决这个问题的方法,我不确定这是答案,尤其是在 https://stackoverflow.com/users/4953079/matthias-j-sax 评论之后。无论如何,分享我到目前为止所拥有的...
我完全复制了 0.10.1 分支中的ProcessorTopologyTestDriver
(这是我正在使用的版本(。
为了解决 KAFKA-4408 问题,我使private final MockConsumer<byte[], byte[]> restoreStateConsumer
可访问并将块task = new StreamTask(...
移动到单独的方法,例如 bootstrap
.
在测试的设置阶段,我执行以下操作
driver = new ProcessorTopologyTestDriver(config, builder)
ArrayList partitionInfos = new ArrayList();
partitionInfos.add(new PartitionInfo('my_ktable', 1, (Node) null, (Node[]) null, (Node[]) null));
driver.restoreStateConsumer.updatePartitions('my_ktable', partitionInfos);
driver.restoreStateConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition('my_ktable', 1), Long.valueOf(0L)));
driver.bootstrap()
就是这样...
奖金
我还遇到了 KAFKA-4461,幸运的是,由于我复制了整个类,我能够通过细微的调整"挑选"接受的修复程序。
一如既往地感谢反馈。虽然显然不是官方测试类,但事实证明该驱动程序非常有用!
对于在Google搜索中到达这里的人,请注意,测试驱动程序类现在是org.apache.kafka.streams.TopologyTestDriver
这个类在 maven package groupId org.apache.kafka, artifactId kafka-streams-test-utils 中