Kafka Streams :单个应用程序中的多个拓扑



我想在单个Kafka流应用程序中同时使用处理器API和DSL。此外,如何在单个应用程序中构建和运行多个拓扑(例如1使用处理器API,其他使用DSL。

您可以轻松地混合使用DSL和处理器API。我如何理解您希望使用这两种方法构建处理图,对于DSL,您可以调用StreamsBuilder::stream,对于处理器API,您可以调用StreamsBuilder::build()以获取Topology,然后应用函数来添加处理器等。

源代码将是这样的:

StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream("input1").to("output1");
Topology topology = builder.build();
topology.addSource("inputNode","input2");
topology.addProcessor("processor1", InputProcessor::new, "inputNode");
topology.addSink("sink1", "output2", "processor1");
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();

编辑1:

您可以使用 DSL 构建两个拓扑,并行运行并侦听不同的主题。它可以像@Matthias J. Sax提到的KStream::transform(...)KStream::transformValues(...)KStream::process(...)那样完成。代码将是这样的:

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> input1 = builder.<String, String>stream("input1").transform(SampleTransformer1::new);
KStream<String, String> input2 = builder.<String, String>stream("input2").transform(SampleTransformer2::new);

相关内容

  • 没有找到相关文章

最新更新