我们有一个kafka streams spring-boot应用程序(使用spring-kafka(,该应用程序当前从上游主题读取消息,应用一些转换,并将其写入下游主题,它不进行任何聚合或联接或任何高级kafka流功能。
该代码目前看起来与此相似
@Bean
fun topology(streamsBuilder: StreamsBuilder): KStream<String, SomeObject> {
val stream = streamsBuilder.stream<String, SomeObject>(inputTopicName)
val branches: Array<KStream<String, SomeObject>> = stream.branch(
{ _, value -> isValidRawData(value)},
{ _, failedValue -> true}
)
branches[0].map { _, value -> transform(value) }.to(outputTopicName)
branches[1].foreach { _, value -> s3Service.uploadEvent(value) }
}
这可以很好地使用,但现在我们需要扩展此代码,以使用来自第二个上游主题的不同模式的消息,并应用稍微不同的转换,然后将它们写入与上述拓扑相同的下游主题(具有类似的模式(。
为了实现这一目标,我们有两种选择;
创建第二个
@Bean
工厂方法,该方法与上面的方法几乎相似,只是其拓扑从一个单独的主题消耗并应用不同的转换。修改到上面的拓扑结构以使用这两个主题,为来自第二个主题的消息创建第三个分支,如下所示
@Bean
fun topology(streamsBuilder: StreamsBuilder): KStream<String, SpecificRecord> {
val topics = listOf("topic1", "topic2")
val stream = streamsBuilder.stream<String, SpecificRecord>(topics)
val branches: Array<KStream<String,SpecificRecord>> = stream.branch(
{ _, value -> isRecordFromTopic1(value)},
{ _, value -> isRecordFromTopic2(value)},
{ _, failedValue -> true}
)
branches[0].map { _, value -> transformTopic1Record(value) }.to(outputTopicName)
branches[1].map { _, value -> transformTopic2Record(value) }.to(outputTopicName)
branches[2].foreach { _, value -> s3Service.uploadEvent(value) }
}
以下哪种方法是推荐的方法?从卡夫卡流资源管理或性能的角度来看,我们需要考虑一些事情吗?
谢谢你的建议。
由于在第二段代码中显示了主题集合API,因此我认为这两个变体都是有效的,并且是有意义的。其他一切都只是个人喜好。我会选择第一个,因为从技术上讲,最终所有东西都将在同一个Streams引擎上工作。当您将来引入第三种记录类型等等时,第一种解决方案更容易支持。或者您可能会为特定的流提供额外的逻辑。您可能有一个从所有主题中读取的公共流,并通过该条件和分支分发它们。你可以通过他们自己的中间主题在他们各自的流中完成其余的逻辑。但仍然:只是我的看法。。。