是否可以在不预定义@Topic("TopicName")的情况下动态使用Micronaut Kafka消费者



我有以下代码,我想提供;topicName";作为参数或从属性中动态读取它,是否可能是

@KafkaListener(offsetReset = OffsetReset.EARLIEST)
public class KafkaConsumer {
private final String topicName;
public KafkaConsumer(String topicName) {
this.topicName = topicName;
}
@Topic("topicName")
public void receive(@KafkaKey String day, String message) {
System.out.println("Got Message for the  - " + day + " and Message is  " + message);
}
}

你可以做:

@Topic("${myTopicFromProperties}")

这有点令人困惑,因为如果问题与OP的注释结合在一起"我需要在每个方法调用中使用不同的主题名称,例如从用户那里获取主题名称,然后创建一个监听器">人们可能会根据官方文档想到以下示例/场景:

@Topic({"topic1", "topic2", "topic3}) //multiple topics, see "Specifying Topics" sub-section on the linked page
public void receive(
@KafkaKey String key,
String message,
long offset, 
int partition, 
String topic, // topic as a parameter
long timestamp
) { 
System.out.println("Got message: " + message + " from topic: " + topic);
}

您也可以使用ConsumerRecord并从中获取所有必要的信息:

// "Receiving a ConsumerRecord" sub-section on the linked page
@Topic({"topic1", "topic2", "topic3})
public void receive(ConsumerRecord<String, String> record) { 
System.out.println("Got message: " + record.value() + " from topic: " + record.topic());
}

您还应该能够通过属性占位符指定主题,如在另一个答案(如@Topic({"${topic1}", "${topic2}", "${topic3}"})(中找到的。

附言:上面的例子假设对于每个指定的主题,消息键和消息体都被反序列化为字符串。

最新更新