KStreamBuilder.from in kafka stream的当前方式是什么?



我正在尝试 http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/streams/KafkaStreams.html 的示例代码

但对于这两行

KStreamBuilder builder = new KStreamBuilder();
builder.from("my-input-topic").mapValue(value -> value.length().toString()).to("my-output-topic");

根据 https://kafka.apache.org/0100/javadoc/org/apache/kafka/streams/kstream/KStreamBuilder.html,现在KStreamBuilder中似乎没有称为"from"的方法

那么目前与上述 2 行相同操作的方法是什么?

Maven 用于管理依赖项

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-streams</artifactId>
  <version>0.10.0.0</version>
</dependency>

此方法已重命名为 stream()from() 是旧名称)

KStreamBuilder builder = new KStreamBuilder();
builder.stream("my-input-topic").mapValues(value -> value.length().toString()).to("my-output-topic");

这是文档中的错误:https://github.com/apache/kafka/pull/1450

也看看这里: http://docs.confluent.io/3.0.0/streams/developer-guide.html#kafka-streams-dsl

更重要的是,KStreamBuilder本身现在已被弃用,应该改用StreamsBuilder。这不仅仅是名称更改,尽管在大多数情况下,最终用户除了类的名称之外,不会立即需要任何其他内容:

通过 DSL 指定拓扑的两个主要类(KStreamBuilder)或处理器API(TopologyBuilder)是已弃用并替换为 StreamsBuilder 和 Topology(两者都是新的类位于包 org.apache.kafka.streams 中)。请注意,那StreamsBuilder 不扩展拓扑,即类层次结构是现在不同了。新类具有与旧的,通过DSL或处理器API构建拓扑。但是,一些在 KStreamBuilder 中公开的内部方法和拓扑生成器,但不是实际 API 的一部分,在新课程不再。

最新更新