我想确认我对多个处理器从一个Kafka Stream源读取的效率的理解。如果我想根据谓词逻辑执行两个不同的过程,我相信示例1中的以下内容是最有效的。谓词查看Value(此处为Notification对象(的内容。如果在示例1中,在以下每个处理器中都有一个断点,则会显示为每个传入的Notification调用了每个Function。而在示例2中,只有在满足谓词逻辑的情况下才能调用process2函数。
示例1
@Bean
public Function<KStream<String, Notification>,KStream<String, Notification>> process1() {
return input -> input
.branch(PREDICATE_FOR_OUT_0, PREDICATE_FOR_OUT_1);
}
@Bean
public Function<KStream<String, Notification>,KStream<String, EnrichedNotification>> process2() {
return input -> input
.filter(PREDICATE_FOR_OUT_2);
.map((key, value) ->.........; //different additional processing to map to EnrichedNotification type
}
没有必要进行以下操作并尝试将一个处理器的输出路由到另一个处理器?(甚至不确定是否可能(
示例2(概念(我可能是这样想的,因为我使用的是纯卡夫卡。这里process1有一个3路分支。其中两个分支转到各自的流,然后转到主题,但第三个分支在路由到主题之前需要进一步处理。
@Bean
public Function<KStream<String, Notification>,KStream<String, Notification>[]> process1() {
return input -> input
.branch(PREDICATE_FOR_OUT_0, PREDICATE_FOR_OUT_1, PREDICATE_FOR_OUT_2);
}
我们是否可以将PREDICATE_for_OUT_2的分支路由到进程2中。这意味着只有在PREDICATE_FOR_OUT_2满足时才会调用process2
@Bean
public Function<KStream<String, Notification>,KStream<String, EnrichedNotification>> process2() {
return input -> input
.map((key, value) ->.........; //different additional processing to map to EnrichedNotification type
}
我的想法是,由于Kafka Streams提供的抽象和功能,示例2是多余的(无论如何都不可能(
我认为您的示例的两种情况都可以完成任务,但存在一些差异。在第一个例子中,您有两个函数,都从同一个Kafka主题接收数据,第二个函数在被路由到输出主题之前执行一些额外的逻辑。在第二个示例中,您又有两个函数。在第一个函数中,您有3个分支,每个分支都向一个Kafka主题发送数据(我假设它们是3个不同的主题(。然后在第二个函数中,从第一个函数的第三个输出主题接收数据。在执行了示例2的第二个函数中的逻辑之后,将其发送到此分支的最终目的地。您将为第二个示例介绍一个额外的主题。我认为你的第一个例子更易读、更干净。