Kafka Stream向同一主题发送多个具有不同头的消息



我需要一个streamer,需要发送多个消息到相同的主题,但具有不同的kafka头。

我可以像下面这样用~分隔发送消息,但是所有消息都有相同的头。

inputStream
.transformValues(()->new Transformation())
.flatMapValues(value->Arrays.asList(value.split("~")))
.split()
.branch(
(key,value)->key.startsWith("ERR"),
Branched.withConsumer(ks -> ks.to(errorTopic)))
.defaultBranch(Branched.withConsumer(ks -> ks.to(outboundTopic)));

如果你想修改标题,你需要使用process()步骤。

stream.process(() -> new MyProcessor());
MyProcessor implements api.Processor<...> {
public void process(Record record) {
// access headers via `record.headers()`
// modify with `withHeaders(...)`
context.forward(record); // send result record downstream
}
}

相关内容

最新更新