我有一段代码,它遍历 KStream 并检查是否满足条件。在这种情况下,它会调用另一个方法来执行其他一些处理。代码如下所示:
stream1.foreach((k, v) -> {
if (someCondition) {
System.out.println("Triggered Join");
joinStreams();
}
}
});
现在,joinStreams()
方法的主体如下所示(仅用于测试目的(。
private static void joinStreams() {
System.out.println("Started Join");
stream2.foreach((k, v) -> System.out.println("OK"));
}
调用 joinStreams()
时,它仅打印"已开始加入"并永久挂起。当我从main()
直接调用它时,它会打印"已开始加入",然后是与流中的消息一样多的"确定"(这是它的正常行为(。我的问题是:什么可能导致这种奇怪的结果?PS:据我所知,问题出在foreach(stream1
(内部的foreach(来自joinStreams()
(。
你不能那样做。
通过调用类似 stream1.foreach
的东西,您正在构建流拓扑。构建整个拓扑后,您可以从类似 new KafkaStreams(topology, streamingConfig).start()
.
您必须了解,只有在流处理拓扑正在执行(正在运行(时,才会调用foreach
正文。因此,对于您的代码,对stream2.foreach
的调用(用于构建拓扑(发生在构建并启动拓扑之后,这毫无意义。