我最近正在学习Apache Kafka Streams并播放世界计数示例。下面是我的代码
public class StreamsStarterApp {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-starter-app");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, String> wordCountPipe = streamsBuilder.stream("word-count-in");
wordCountPipe.filter((key, value) -> StringUtils.isNoneBlank(value))
.mapValues(value -> value.toLowerCase())
.flatMapValues(value -> Splitter.on(",").trimResults().split(value))
.groupBy((key,value)-> value)
.count(Named.as("count"))
.toStream()
.to("word-count-out", Produced.with(Serdes.String(),Serdes.Long()));
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
kafkaStreams.start();
Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
}
}
我有一个有趣的观察结果,如果我注释".mapValues(value->value.toLowerCase((",结果会有所不同,这让我非常困惑,因为代码中的任何更改都会导致无法预测的结果更改
发送你好,你好到主题"字数int">
结果将显示你好2
如果我评论".mapValues(value->value.toLowerCase(("并发送你好,世界又来了结果将显示你好1世界第一
这怎么会发生?这与Kafka流内的状态存储有关吗
修改KafkaStreams应用程序(即删除或添加运算符(可能会导致不兼容。通常,如果您想更改程序(cfhttps://docs.confluent.io/current/streams/developer-guide/app-reset-tool.html)。
对于您的特殊情况,问题在于操作员名称。名称是使用内部计数器自动生成的,以避免命名冲突。如果删除一个运算符,则下游运算符的名称将更改。因此,count()
运算符找不到它的旧状态(每个stat存储也有一个名称,存储的名称也会更改(,因此在删除mapValues
后,您将从空状态开始。
您可以通过Topology#describe()
检查命名。这允许您在更改代码之前和之后比较拓扑结构。
为了允许兼容升级,DSL允许您明确指定名称(cfhttps://docs.confluent.io/current/streams/developer-guide/dsl-topology-naming.html)。这样,命名就不会改变。对于字数示例,您可以通过以下方式指定名称:
.count(Materialized.as("myName"))