我正在尝试 http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/streams/KafkaStreams.html 的示例代码
但对于这两行
KStreamBuilder builder = new KStreamBuilder();
builder.from("my-input-topic").mapValue(value -> value.length().toString()).to("my-output-topic");
根据 https://kafka.apache.org/0100/javadoc/org/apache/kafka/streams/kstream/KStreamBuilder.html,现在KStreamBuilder中似乎没有称为"from"的方法
那么目前与上述 2 行相同操作的方法是什么?
Maven 用于管理依赖项
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.10.0.0</version>
</dependency>
此方法已重命名为 stream()
(from() 是旧名称)
KStreamBuilder builder = new KStreamBuilder();
builder.stream("my-input-topic").mapValues(value -> value.length().toString()).to("my-output-topic");
这是文档中的错误:https://github.com/apache/kafka/pull/1450
也看看这里: http://docs.confluent.io/3.0.0/streams/developer-guide.html#kafka-streams-dsl
更重要的是,KStreamBuilder
本身现在已被弃用,应该改用StreamsBuilder
。这不仅仅是名称更改,尽管在大多数情况下,最终用户除了类的名称之外,不会立即需要任何其他内容:
通过 DSL 指定拓扑的两个主要类(KStreamBuilder)或处理器API(TopologyBuilder)是已弃用并替换为 StreamsBuilder 和 Topology(两者都是新的类位于包 org.apache.kafka.streams 中)。请注意,那StreamsBuilder 不扩展拓扑,即类层次结构是现在不同了。新类具有与旧的,通过DSL或处理器API构建拓扑。但是,一些在 KStreamBuilder 中公开的内部方法和拓扑生成器,但不是实际 API 的一部分,在新课程不再。