我有基于一个非常好、可靠和方便的TopologyTestDriver:的Kafka Streams单元测试
try (TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(),
streamsConfig(Serdes.String().getClass(), SpecificAvroSerde.class))) {
TestInputTopic<String, Event> inputTopic = testDriver.createInputTopic(inputTopicName,
Serdes.String().serializer(), eventSerde.serializer());
TestOutputTopic<String, Frame> outputWindowTopic = testDriver.createOutputTopic(
outputTopicName, Serdes.String().deserializer(), frameSerde.deserializer());
...
}
我想测试一个更复杂的设置,其中"输出"主题是另一个拓扑的"输入"主题。
我可以在同一拓扑中定义几个输入和输出主题。但是,一旦我在同一拓扑中使用相同的主题作为输入和输出主题,我就会得到以下异常:
org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic events has already been registered by another source.
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.validateTopicNotAlreadyRegistered(InternalTopologyBuilder.java:578)
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addSource(InternalTopologyBuilder.java:378)
at org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode.writeToTopology(StreamSourceNode.java:94)
at org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:303)
at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:558)
at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:547)
TopologyTestDriver似乎没有提供定义输入输出主题的可能性,是吗?
更新为了更好地说明我正在努力实现的目标:
builder.stream("input-topic, ...)..to("intermediate-topic",...);
builder.stream("intermediate-topic", ...)..to("output-topic",...);
并且我希望能够在我的单元测试中验证(断言("中介主题"的内容。顺便说一句,我不能在构建下一个拓扑部分时"重用"调用".to(("的结果,因为该方法返回void。
但我只有testDriver.createInputTopic()
和testDriver.createOutputTopic()
,没有办法定义像testDriver.createInputOutputTopic()
这样的东西。
使用相同的主题作为输入和输出主题应该可以。但是,不能多次使用同一主题作为输入主题(strack跟踪指示您尝试这样做(。
如果你想两次使用相同的输入主题,你只需要添加一次,然后"扇出":
KStream stream = builder.stream(...);
stream.map(...); // first usage
stream.filter(...); // second usage
两次使用相同的KStream
对象,基本上是一个"扇出"(或"广播"(,将输入数据发送给两个操作员。