我想在单个Kafka流应用程序中同时使用处理器API和DSL。此外,如何在单个应用程序中构建和运行多个拓扑(例如1使用处理器API,其他使用DSL。
您可以轻松地混合使用DSL和处理器API。我如何理解您希望使用这两种方法构建处理图,对于DSL,您可以调用StreamsBuilder::stream
,对于处理器API,您可以调用StreamsBuilder::build()
以获取Topology
,然后应用函数来添加处理器等。
源代码将是这样的:
StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream("input1").to("output1");
Topology topology = builder.build();
topology.addSource("inputNode","input2");
topology.addProcessor("processor1", InputProcessor::new, "inputNode");
topology.addSink("sink1", "output2", "processor1");
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
编辑1:
您可以使用 DSL 构建两个拓扑,并行运行并侦听不同的主题。它可以像@Matthias J. Sax提到的KStream::transform(...)
、KStream::transformValues(...)
和KStream::process(...)
那样完成。代码将是这样的:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> input1 = builder.<String, String>stream("input1").transform(SampleTransformer1::new);
KStream<String, String> input2 = builder.<String, String>stream("input2").transform(SampleTransformer2::new);