我有以下代码,我想提供;topicName";作为参数或从属性中动态读取它,是否可能是
@KafkaListener(offsetReset = OffsetReset.EARLIEST)
public class KafkaConsumer {
private final String topicName;
public KafkaConsumer(String topicName) {
this.topicName = topicName;
}
@Topic("topicName")
public void receive(@KafkaKey String day, String message) {
System.out.println("Got Message for the - " + day + " and Message is " + message);
}
}
你可以做:
@Topic("${myTopicFromProperties}")
这有点令人困惑,因为如果问题与OP的注释结合在一起"我需要在每个方法调用中使用不同的主题名称,例如从用户那里获取主题名称,然后创建一个监听器">人们可能会根据官方文档想到以下示例/场景:
@Topic({"topic1", "topic2", "topic3}) //multiple topics, see "Specifying Topics" sub-section on the linked page
public void receive(
@KafkaKey String key,
String message,
long offset,
int partition,
String topic, // topic as a parameter
long timestamp
) {
System.out.println("Got message: " + message + " from topic: " + topic);
}
您也可以使用ConsumerRecord
并从中获取所有必要的信息:
// "Receiving a ConsumerRecord" sub-section on the linked page
@Topic({"topic1", "topic2", "topic3})
public void receive(ConsumerRecord<String, String> record) {
System.out.println("Got message: " + record.value() + " from topic: " + record.topic());
}
您还应该能够通过属性占位符指定主题,如在另一个答案(如@Topic({"${topic1}", "${topic2}", "${topic3}"})
(中找到的。
附言:上面的例子假设对于每个指定的主题,消息键和消息体都被反序列化为字符串。