如何使用来自 KStream 输出主题的消息



我正在使用 KStream.to("outputtopic"(写入输出主题;在 apache 文档中提到,它将自动创建传递给 to(( 的主题。如何使用该主题中的消息?

我可以使用 consumer.subscribe(( 来输出主题和轮询消息吗?

        KStreamBuilder builder = new KStreamBuilder();
        builder.stream(topic).filterNot((k, v) -> {
            v.toString().contains(tid);
        }).to("outputtopic");
        streams = new KafkaStreams(builder, config);
        streams.start();
        consumer.subscribe(Arrays.asList("outputtopic"));
builder.stream(topic).filterNot((k, v) -> {
            v.toString().contains(tid);
        }) // i.e., without the last `to()` method

这一系列方法的结果是一个 KStream .如果您的问题是关于如何从同一应用程序中继续对生成的KStream进行操作,那么请像这样操作:

KStream<..., ...> myStream = builder.stream(topic).filterNot((k, v) -> {
            v.toString().contains(tid);
        });
myStream.to("outputtopic");
// Then continue to use the `myStream` instance for further work.
myStream.map(....).aggregate(...);

如果你的问题是如何从不同的应用程序读取输出主题,那么你可以通过从另一个 Kafka Streams 应用程序、KSQL、普通 Kafka 消费者(通过订阅(等读取本主题来实现。

相关内容

  • 没有找到相关文章

最新更新