我很难理解基本的kafka流示例:
https://github.com/confluentinc/kafka-streams-examples/blob/5.2.1-post/src/src/main/java/io/confluent/confluent/confluent/confluent/exampleass/wordsstreams/wordecountcountlambdaexample.java
// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys). The default key and value serdes will be used.
final KStream<String, String> textLines = builder.stream(inputTopic);
final Pattern pattern = Pattern.compile("\W+", Pattern.UNICODE_CHARACTER_CLASS);
final KTable<String, Long> wordCounts = textLines
// Split each text line, by whitespace, into words. The text lines are the record
// values, i.e. we can ignore whatever data is in the record keys and thus invoke
// `flatMapValues()` instead of the more generic `flatMap()`.
.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
// Group the split data by word so that we can subsequently count the occurrences per word.
// This step re-keys (re-partitions) the input data, with the new record key being the words.
// Note: No need to specify explicit serdes because the resulting key and value types
// (String and String) match the application's default serdes.
.groupBy((keyIgnored, word) -> word)
// Count the occurrences of each word (record key).
.count();
// Write the `KTable<String, Long>` to the output topic.
wordCounts.toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
有人可以解释.flatmapvalues部分吗?
从我所看到的,FlatMapValues将KStream<String, String>
转换为 KStream<String, List<String>>
那么后续的.groupby链接如何以某种方式具有String, String
输入参数?
.flatMap
是一个运算符,当返回集合时,将其单个元素"平坦"将单个项目返回到下一个操作员