单元测试使用KStream加入的Kafka拓扑



我有一个拓扑结合了两个kStream连接,我面临的问题是,当试图用Toopology -TestDriver对单位测试进行单位测试时,将几个带有管道输入的消费记录和读取图。似乎没有起作用。

我认为这可能是因为连接使用了我们在测试中不使用的实际kafka中的内部rocksdb。

所以我一直在寻找解决方案,但找不到任何解决方案。

注意:删除kStream-kstream加入时,这种测试方法非常有效。

我的拓扑结合了两个kStream加入,我面临的问题是,在尝试使用Toopology -Testdriver进行单位测试时,将几个带有管道输入的消费记录发送,然后再进行读取。似乎没有起作用。

通过设计,但不幸的是,在您的情况下,TopologyTestDriver并不是Kafka Streams引擎在运行时工作方式的100%精确模型。值得注意的是,新事件的处理顺序存在一些差异。

这确实在尝试测试时确实会引起问题,例如某些连接,因为这些操作取决于某个处理顺序(例如,在流台上的加入中,该表应该已经在钥匙"爱丽丝"的条目中有一个条目"爱丽丝"的流端事件到达,否则流端"爱丽丝"的联接输出将不包括任何桌面数据)。

所以我一直在寻找解决方案,但找不到任何解决方案。

我建议的是使用旋转嵌入式Kafka群集的测试,然后使用" Real" Kafka Streams Engine(即TopologyTestDriver)对该群集运行测试。有效地,这意味着您将测试从单位测试更改为集成/系统测试:您的测试将启动一个完整的Kafka流拓扑,该拓扑与嵌入式KAFKA群集对话,该群集在与您的测试的同一机器上运行。

请参阅Apache Kafka项目中KAFKA流的集成测试,其中EmbeddedKafkaClusterIntegrationTestUtils是工具的中心部分。连接的具体测试示例是StreamTableJoinIntegrationTest(有一些相关的集成测试)及其父 AbstractJoinIntegrationTest。(就其价值而言,在https://github.com/confluentinc/kafka-streams-exampleas-exampleas-xamples-integration-tests上有进一步的集成测试示例数据格式等)

但是,除非我错误地错误,否则集成测试及其工具不包含在Kafka流的测试实用工具中(即org.apache.kafka:kafka-streams-test-utils)。因此,您必须在自己的代码库中进行一些复制。

您是否看过Kafka流单元测试[1]?这是关于数据中的管道并使用模拟处理器检查最终结果。

例如,以下流连接:

        stream1 = builder.stream(topic1, consumed);
        stream2 = builder.stream(topic2, consumed);
        joined = stream1.outerJoin(
            stream2,
            MockValueJoiner.TOSTRING_JOINER,
            JoinWindows.of(ofMillis(100)),
            StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
        joined.process(supplier);

您可以开始将输入项目输入到第一个或第二个主题中,并使用每个连续的输入管道检查处理器可以检查的内容:

// push two items to the primary stream; the other window is empty
            // w1 = {}
            // w2 = {}
            // --> w1 = { 0:A0, 1:A1 }
            //     w2 = {}
            for (int i = 0; i < 2; i++) {
                inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i]);
            }
            processor.checkAndClearProcessResult(EMPTY);
            // push two items to the other stream; this should produce two items
            // w1 = { 0:A0, 1:A1 }
            // w2 = {}
            // --> w1 = { 0:A0, 1:A1 }
            //     w2 = { 0:a0, 1:a1 }
            for (int i = 0; i < 2; i++) {
                inputTopic2.pipeInput(expectedKeys[i], "a" + expectedKeys[i]);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+a0", 0),
                new KeyValueTimestamp<>(1, "A1+a1", 0));

我希望这会有所帮助。

参考:[1] https://github.com/apache/kafka/kafka/blob/trunk/streams/src/src/test/java/java/opache/kaface/kafka/kafka/kafka/kafka/kerams/kstreams/streams/streams/kstreams/kstreams/kstreamkointestestest.java#l279

l279 Div>

最新更新