我试图通过各种教程,但在kafka流的两个方面尚不清楚。让我们以以下提到的单词计数示例:https://docs.confluent.io/current/streams/quickstart.html
// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream<String, String> textLines = builder.stream("streams-plaintext-input", Consumed.with(stringSerde, stringSerde));
KTable<String, Long> wordCounts = textLines
// Split each text line, by whitespace, into words. The text lines are the message
// values, i.e. we can ignore whatever data is in the message keys and thus invoke
// `flatMapValues` instead of the more generic `flatMap`.
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
// We use `groupBy` to ensure the words are available as message keys
.groupBy((key, value) -> value)
// Count the occurrences of each word (message key).
.count();
// Convert the `KTable<String, Long>` into a `KStream<String, Long>` and write to the output topic.
wordCounts.toStream().to("streams-wordcount-output",
Produced.with(stringSerde, longSerde));
这里有几个问题:
1.(由于原始流中没有钥匙,因此两个单词可以在两个不同的节点上降落,因为它们可能会属于不同的分区,因此真正的数量将是它们两个的汇总。似乎在这里没有完成吗?在此处提供同一主题分区的不同节点是否在此处协调以汇总计数?
2.(当每个操作生成新流(例如FlatMapValues,GroupBy等(时,这些分区是针对这些子流中的消息重新计算的,以便它们在不同的节点上降落?
将感谢这里的任何帮助!
1。(由于原始流中没有密钥,因此两个单词可以在两个不同的节点上降落,因为它们可能会属于不同的分区,因此,真正的数量将是它们两个的聚集。似乎在这里没有完成吗?
在这里完成。这是相关代码:
// We use `groupBy` to ensure the words are available as message keys
.groupBy((key, value) -> value)
在这里,"单词"成为新的消息密钥,这意味着单词被重新分配,以便每个单词仅放入一个分区中。
在此处提供同一主题分区的不同节点在此处协调以汇总计数?
不,他们没有。一个分区仅由一个节点处理(更准确地说:仅通过一个流任务,请参见下文(。
2。(由于每个操作都会生成新流(例如FlatMapValues,groupby等(,这些分区是针对这些子流中的消息重新计算的,以便它们在不同的节点上降落吗?
?
不确定我理解您的问题,尤其是"重新固定"评论。操作(如聚合(总是每个分区执行,而Kafka流映射分区以流到流任务(稍微简化:一个分区总是由一个流任务处理(。流任务由Kafka Streams应用程序的各种实例执行,该应用程序通常在不同的容器/VMS/机器上运行。如果需要的话,需要重新分配数据(请参阅上面的问题1并回答(,以产生预期的结果 - 也许这就是您说"重新计算"时的意思。
我建议阅读kafka的文档,例如https://kafka.apache.org/documentation/streams/architecture#streams_architecture_tasks.