使用 Kafka Stream DSL 写入处理器中的主题



我需要将Kafka Sreams API与Processor API一起使用。我还想在我的处理器实现中将不同类型的对象写入不同的主题,即在进程标点符号上发出不同的对象。我已经看到有一个KIP-313 flatTransform可能会解决我的问题。

如果我使用:

inputStream.process(processorSupplier,,)

由于这是一个"终止"操作(其返回类型为 void(,我可以在我的处理器中使用内部 Kafka 生产者吗?我还没有看到这样的实现,这是一种合理的方法,有什么副作用吗?

如果您需要如此低级的方法,您可以自己构建整个拓扑:

Topology topology = builder.build();
topology.addSource("inputNode","input");
topology.addProcessor("inProcessor", InputProcessor::new, "inputNode");
topology.addSink("sink1",
    (k, v, rc) -> "topic1",
    new StringSerializer(),
    new IntegerSerializer(),
    "inProcessor");
topology.addSink("sink2",
    (k, v, rc) -> "topic2",
    new StringSerializer(),
    new StringSerializer(),
    "inProcessor");

输入处理器依赖于业务逻辑生成不同类型的对象,并将它们传递给不同的接收器节点(主题(。

示例示例具有如下逻辑:

  • 如果消息的值可以解析为整数,则将其转发到两个接收器节点(sink1,sink2(,转发到sink1作为Integer sink2作为String
  • 如果不转发消息,则仅转发到 sink2。
public class InputProcessor extends AbstractProcessor<String, String> {
    @Override
    public void process(String key, String value) {
        try {
            context().forward(key, Integer.parseInt(value), To.child("sink1"));
            context().forward(key, value, To.child("sink2"));
        }
        catch (NumberFormatException nfe) {
            context().forward(key, value, To.child("sink2"));
        }
    }
}

相关内容

  • 没有找到相关文章

最新更新