我需要一个streamer,需要发送多个消息到相同的主题,但具有不同的kafka头。
我可以像下面这样用~分隔发送消息,但是所有消息都有相同的头。
inputStream
.transformValues(()->new Transformation())
.flatMapValues(value->Arrays.asList(value.split("~")))
.split()
.branch(
(key,value)->key.startsWith("ERR"),
Branched.withConsumer(ks -> ks.to(errorTopic)))
.defaultBranch(Branched.withConsumer(ks -> ks.to(outboundTopic)));
如果你想修改标题,你需要使用process()
步骤。
stream.process(() -> new MyProcessor());
MyProcessor implements api.Processor<...> {
public void process(Record record) {
// access headers via `record.headers()`
// modify with `withHeaders(...)`
context.forward(record); // send result record downstream
}
}