Spring表达式语言问题



我有以下类。我已经在控制台中验证过,在解析Kafka listener:中的主题占位符值之前,调用了这个类的构造函数(在bean创建期间(

public class MsgReceiver<MSG> extends AbstractMsgReceiver<MSG> implements 
MessageReceiver<MSG> {
@SuppressWarnings("unused")
private String topic;
public MsgReceiver(String topic, MessageHandler<MSG> handler) {
super(handler);
this.topic = topic;
}
@KafkaListener(topics = "${my.messenger.kafka.topics.#{${topic}}.value}", groupId = "${my.messenger.kafka.topics.#{${topic}}.groupId}")
public void receiveMessage(@Headers Map<String, Object> headers, @Payload MSG payload) {
System.out.println("Received "+payload);
super.receiveMessage(headers, payload);
}

}

我有我的申请.yml如下:

my:
messenger:
kafka:
address: localhost:9092
topics:
topic_1:
value: my_topic
groupId: 1

在创建bean的过程中;topic_ 1";我希望它应该在Kafka监听器主题占位符中动态使用。我按照代码本身所示进行了尝试,但不起作用。请建议如何做到这一点。

在评估SpEL之前解析占位符;您无法使用SpEL动态构建占位符名称。此外,您不能引用这样的字段;您必须通过bean名称(和一个公共getter(间接地执行此操作。

因此,要做您想要做的事情,您必须添加一个getter,并在使用SpEL构建属性名称后从环境中动态获取属性。

有一个特殊的令牌__listener,它允许您引用当前bean。

把它们放在一起。。。

@SpringBootApplication
public class So63056065Application {
public static void main(String[] args) {
SpringApplication.run(So63056065Application.class, args);
}
@Bean
public MyReceiver receiver() {
return new MyReceiver("topic_1");
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("my_topic").partitions(1).replicas(1).build();
}
}
class MyReceiver {
private final String topic;
public MyReceiver(String topic) {
this.topic = topic;
}
public String getTopic() {
return this.topic;
}
@KafkaListener(topics = "#{environment.getProperty('my.messenger.kafka.topics.' + __listener.topic + '.value')}",
groupId = "#{environment.getProperty('my.messenger.kafka.topics.' + __listener.topic + '.groupId')}")
public void listen(String in) {
System.out.println(in);
}
}

结果。。。

2020-07-23 12:13:44.932  INFO 39561 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = default
client.id = 
client.rack = 
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = 1
group.instance.id = null
...

1: partitions assigned: [my_topic-0]

最新更新