Apache Kafka:检查主题中消息的存在



我有一个情况,我需要检查主题中是否已经存在特定消息,我绝对不需要该主题中的重复项。

任何人都可以提出任何优雅的方法,而不是消耗所有消息并检查它们。

我不认为自己是卡夫卡的专家,但我认为您假装是"反对" Kafka的本质。

但是,我使用java的Kafka Streams库提出了解决方案。基本上,该过程如下:

  • 将每个消息映射到一个新的键值中,其中键是上一个键及其值的组合: (key1, message1) -> (key1-message1, message1)

  • 通过此操作,您获得 kgroupedStream

  • 应用降低功能,将值修改为某些自定义值,例如字符串"重复值"。

  • 降低后将所得的ktable转换为kStream并将其推入新的Kafka主题。

以前的解释中有很多假设,我将提供一些代码以示一些启示:

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> resources =  builder.stream("topic-where-the-messages-are-sent");
KeyValueMapper<String, String, KeyValue<String,String>> kvMapper = new KeyValueMapper<String, String, KeyValue<String,String>>() {
    public KeyValue<String, String> apply(String key, String value) {
        return new KeyValue<String, String>(key + "-" + value, value);
    }
};
Reducer<String> reducer = new Reducer<String>() {
    public String apply(String value1, String value2) {
        return "Duplicated message";
    }
};
resources.map(kvMapper)
    .groupByKey()
    .reduce(reducer, "test-store-name")
    .toStream()
    .to("unique-message-output");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();

请记住,这可能不是一个最佳解决方案,也许您不会将其视为解决问题的"优雅"方式。

我希望它有帮助。

相关内容

  • 没有找到相关文章

最新更新