我正在浏览一些使用Kafka流的基本示例。我注意到没有设置钥匙。我正试图为每条记录设置一个唯一的密钥。一行文本被拆分为多个记录——一个记录就是行中的一个单词。我目前拥有的脚本为每一条记录都附加了相同的密钥。我希望每一张唱片都有一把独一无二的钥匙,但我看不出有什么好地方可以放。非常感谢你的帮助。这是我现在拥有的:
KStream<String, String> source = builder.stream(INPUT_TOPIC);
KStream<String, String> words = source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String line) {
return Arrays.asList(line.split("\W+"));
}
});
Long key = (long)(Math.random()*9000)+1000;
KStream<String, String> wordsKeyed = words.map((k, v) -> new KeyValue<>("xyx"+key, v));
wordsKeyed.to(OUTPUT_TOPIC);
它附加了相同的键,因为您只计算一个随机值。
如果您希望每个密钥都是随机的,请使用new KeyValue<>("xyx" + (Math.random()*9000+1000), ...)
如果您希望每个密钥都是唯一的(因为随机数可能重叠(,请使用AtomicInteger
和incrementAndGet()