如何使用多个变压器使用相同的主题kafka流



我需要使用多个转换器解析kafka上的复杂消息。每个转换器解析消息的一部分,并通过填充消息上的一些属性来编辑消息。最后,使用Kafka消费者将完全解析的消息存储在数据库中。目前,我正在做这个:

streamsBuilder.stream(Topic.A, someConsumer)
\ filters messages that have unparsed parts of type X
.filter(filterX)
\ transformer that edits the message and produces new Topic.E messages
.transform(ParseXandProduceE::new)
.to(Topic.A, someProducer)
streamsBuilder.stream(Topic.A, someConsumer)
\ filters messages that have unparsed parts of type Y
.filter(filterY)
\ transformer that edits the message and produces new Topic.F messages
.transform(ParseYandProduceF::new)
.to(Topic.A, someProducer)

变压器看起来像:

class ParseXandProduceE implements Transformer<...> {
@Override
public KeyValue<String, Message> transform (String key, Message message) {
message.x = parse(message.rawX);
context.forward(newKey, message.x, Topic.E);
return KeyValue.pair(key, message);
}
}

然而,这很麻烦,因为相同的消息会在这些流中多次流动。此外,还有一个消费者将topic.A的消息存储在数据库中。消息当前存储多次,在每次转换之前和之后。有必要将每条消息存储一次。

以下方法可能有效,但似乎不可取,因为每个filter+transform块都可以干净地放在自己的单独类中:

streamsBuilder.stream(Topic.A, someConsumer)
\ transformer that filters and edits the message and produces new Topic.E + Topic.F messages
.transform(someTransformer)
.to(Topic.B, someProducer)

并使持久性消费者监听CCD_ 2。

后一种建议的解决方案是可行的,还是有其他方法可以达到同样的结果?也许有一个完整的源和接收器拓扑配置?如果是这样的话,在这种情况下会是什么样子?

使用单个转换器似乎是最简单的解决方案。因为你有两个独立的过滤器,如果你试图链接单个操作符,程序会变得更加复杂。如果您知道每条消息只能通过一个过滤器,而不能同时通过两个过滤器,则可以使用branch():

KStream[] subStreams = stream.branch(new Predicates[]{filterX,filterY});
subStream[0].transform(ParseXandProduceE::new)
.merge(subStream[1].transform(ParseYandProduceF::new)
.to(...)

请注意,只有当两个转换器都不需要转换消息时,上述解决方案才有效(branch()将每个消息放入第一个匹配谓词的分支,但从不放入多个分支(。因此,如果一条消息可以通过两个过滤器,你需要做这样一件更复杂的事情:

KStream[] subStreams = stream.branch(new Predicates[]{filterX,filterY});
KStream passedX = subStreams[0];
KStream transformedXE = passedX.transform(ParseXandProduceE::new);
// a message that passed filterX may also pass filterY,
// and thus we merge those message back to the "y-stream"
// (of course, those messages would already be transformed by `ParseXandProduceE`)
KStream passedY = subStream[1].merge(transformedXE.filter(filterY);
// the result contains all message that only pass filterX and got transformed,
// plus all messages that passed filterY (and maybe also filterX) and got transformed
KStream result = transformedXE.filterNot(filterY)
.merge(passedY.transform(ParseYandProduceF::new)
result.to(...)

最新更新