我有一个情况,我需要检查主题中是否已经存在特定消息,我绝对不需要该主题中的重复项。
任何人都可以提出任何优雅的方法,而不是消耗所有消息并检查它们。
我不认为自己是卡夫卡的专家,但我认为您假装是"反对" Kafka的本质。
但是,我使用java的Kafka Streams库提出了解决方案。基本上,该过程如下:
-
将每个消息映射到一个新的键值中,其中键是上一个键及其值的组合:
(key1, message1) -> (key1-message1, message1)
-
通过此操作,您获得 kgroupedStream 。
-
应用降低功能,将值修改为某些自定义值,例如字符串"重复值"。
-
降低后将所得的ktable转换为kStream并将其推入新的Kafka主题。
以前的解释中有很多假设,我将提供一些代码以示一些启示:
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> resources = builder.stream("topic-where-the-messages-are-sent");
KeyValueMapper<String, String, KeyValue<String,String>> kvMapper = new KeyValueMapper<String, String, KeyValue<String,String>>() {
public KeyValue<String, String> apply(String key, String value) {
return new KeyValue<String, String>(key + "-" + value, value);
}
};
Reducer<String> reducer = new Reducer<String>() {
public String apply(String value1, String value2) {
return "Duplicated message";
}
};
resources.map(kvMapper)
.groupByKey()
.reduce(reducer, "test-store-name")
.toStream()
.to("unique-message-output");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
请记住,这可能不是一个最佳解决方案,也许您不会将其视为解决问题的"优雅"方式。
我希望它有帮助。