我正在使用 KStream.to("outputtopic"(写入输出主题;在 apache 文档中提到,它将自动创建传递给 to(( 的主题。如何使用该主题中的消息?
我可以使用 consumer.subscribe(( 来输出主题和轮询消息吗?
KStreamBuilder builder = new KStreamBuilder();
builder.stream(topic).filterNot((k, v) -> {
v.toString().contains(tid);
}).to("outputtopic");
streams = new KafkaStreams(builder, config);
streams.start();
consumer.subscribe(Arrays.asList("outputtopic"));
builder.stream(topic).filterNot((k, v) -> {
v.toString().contains(tid);
}) // i.e., without the last `to()` method
这一系列方法的结果是一个 KStream
.如果您的问题是关于如何从同一应用程序中继续对生成的KStream
进行操作,那么请像这样操作:
KStream<..., ...> myStream = builder.stream(topic).filterNot((k, v) -> {
v.toString().contains(tid);
});
myStream.to("outputtopic");
// Then continue to use the `myStream` instance for further work.
myStream.map(....).aggregate(...);
如果你的问题是如何从不同的应用程序读取输出主题,那么你可以通过从另一个 Kafka Streams 应用程序、KSQL、普通 Kafka 消费者(通过订阅(等读取本主题来实现。