(卡夫卡)流 - 在每个内部



我有一段代码,它遍历 KStream 并检查是否满足条件。在这种情况下,它会调用另一个方法来执行其他一些处理。代码如下所示:

stream1.foreach((k, v) -> {
        if (someCondition) {
            System.out.println("Triggered Join");
            joinStreams();
        }
    }
});

现在,joinStreams()方法的主体如下所示(仅用于测试目的(。

private static void joinStreams() {
    System.out.println("Started Join");
    stream2.foreach((k, v) -> System.out.println("OK"));
}

调用 joinStreams() 时,它仅打印"已开始加入"并永久挂起。当我从main()直接调用它时,它会打印"已开始加入",然后是与流中的消息一样多的"确定"(这是它的正常行为(。我的问题是:什么可能导致这种奇怪的结果?PS:据我所知,问题出在foreach(stream1(内部的foreach(来自joinStreams()(。

你不能那样做。

通过调用类似 stream1.foreach 的东西,您正在构建流拓扑。构建整个拓扑后,您可以从类似 new KafkaStreams(topology, streamingConfig).start() .

您必须了解,只有在流处理拓扑正在执行(正在运行(时,才会调用foreach正文。因此,对于您的代码,对stream2.foreach的调用(用于构建拓扑(发生在构建并启动拓扑之后,这毫无意义。

相关内容

  • 没有找到相关文章

最新更新