在Kafka Streams应用程序中,不再编写第二个输出流



我目前正在实现一个Kafka Streams应用程序,我正在阅读一个主题并进行一些处理。在处理过程中,我将其分为两个流,一个写入一个主题(Avro Schema(,另一个是计数聚合(字数(,将键/值对(String/Long(写入不同的主题。代码之前运行良好,但最近第二个流不再编写。

在此代码示例中:

KStream<String, ProcessedSentence> sentenceKStream = stream
.map((k,v) -> {
[...]
});
// configure serializers for publishing to topic
final Serde<ProcessedSentence> valueProcessedSentence = new SpecificAvroSerde<>();
valueProcessedSentence.configure(serdeConfig, false);
stringSerde.configure(serdeConfig, true);
// write to Specific Avro Record
sentenceKStream
.to(EnvReader.KAFKA_SENTENCES_TOPIC_NAME_OUT,
Produced.with(
Serdes.String(),
valueProcessedSentence));

句子流(sentenceKStream(写得正确,但单词计数分组出现了问题:

KStream<String, Long> wordCountKStream =
sentenceKStream.flatMap((key, processedSentence) -> {
List<KeyValue<String, Long>> result = new LinkedList<>();
Map<CharSequence, Long> words = processedSentence.getWords();
for (CharSequence word: words.keySet() ) {
result.add(KeyValue.pair(word.toString(), words.get(word)));
}
return result;
})
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum)
.toStream();
// write to Specific Avro Record
wordCountKStream
.to(EnvReader.KAFKA_WORDS_COUNT_TOPIC_NAME_OUT,
Produced.with(
Serdes.String(),
Serdes.Long()));

我真的不明白为什么wordCountKStream不再被编写了。

也许有人可以提供一些帮助?我很乐意提供任何进一步的细节!

非常感谢

更新:我发现两个新的输出流中都缺少数据。实际上,所有内容都写得很正确,但在写入数据几分钟后,两个主题中的数据都被删除了(还剩0字节(。

它与实现本身无关。我刚刚使用删除了所有主题偏移

kafka-consumer-groups.sh --bootstrap-server [broker:port] --delete-offsets --group [group_name] --topic [topic_name]

这解决了问题。存储的偏移量刚刚出现问题,并且在调试过程中与流应用程序的多次重新启动相冲突。

对于那些想列出组以找到存储的主题位置的人,请致电

kafka-consumer-groups.sh --bootstrap-server node1:9092 --list

更新:不幸的是,组偏移量的删除也无法正常工作。实际的问题是,输出主题中新条目的时间戳是来自原始主题(已使用(的时间戳,该时间戳根本没有更改。因此,新条目携带的时间戳早于默认保留时间。

由于消费主题的租金为-1(永远保留数据(,而新主题的标准是,我认为,6天,消费主题中的条目仍然存在,但生产主题中的项目总是被删除,因为它们超过了6天。

最终的解决方案(对我来说(是将输出主题的retention.ms更改为-1,这意味着";永远保持";。这可能不是生产环境的最佳解决方案。

提示:对于Streams应用程序,建议使用应用程序重置工具,而不是如上所示手动重置/删除偏移。

相关内容

  • 没有找到相关文章

最新更新