我需要将Kafka Sreams API与Processor API一起使用。我还想在我的处理器实现中将不同类型的对象写入不同的主题,即在进程和标点符号上发出不同的对象。我已经看到有一个KIP-313 flatTransform可能会解决我的问题。
如果我使用:
inputStream.process(processorSupplier,,)
由于这是一个"终止"操作(其返回类型为 void(,我可以在我的处理器中使用内部 Kafka 生产者吗?我还没有看到这样的实现,这是一种合理的方法,有什么副作用吗?
如果您需要如此低级的方法,您可以自己构建整个拓扑:
Topology topology = builder.build();
topology.addSource("inputNode","input");
topology.addProcessor("inProcessor", InputProcessor::new, "inputNode");
topology.addSink("sink1",
(k, v, rc) -> "topic1",
new StringSerializer(),
new IntegerSerializer(),
"inProcessor");
topology.addSink("sink2",
(k, v, rc) -> "topic2",
new StringSerializer(),
new StringSerializer(),
"inProcessor");
输入处理器依赖于业务逻辑生成不同类型的对象,并将它们传递给不同的接收器节点(主题(。
示例示例具有如下逻辑:
- 如果消息的值可以解析为整数,则将其转发到两个接收器节点(sink1,sink2(,转发到sink1作为
Integer
sink2作为String
。 - 如果不转发消息,则仅转发到 sink2。
public class InputProcessor extends AbstractProcessor<String, String> {
@Override
public void process(String key, String value) {
try {
context().forward(key, Integer.parseInt(value), To.child("sink1"));
context().forward(key, value, To.child("sink2"));
}
catch (NumberFormatException nfe) {
context().forward(key, value, To.child("sink2"));
}
}
}