Spring AMQP 动态创建 RabbitTemplate 和 SimpleMessageListenerConta



我正在尝试在Spring Boot中实现一个网关,具有REST端点并将消息插入RabbitMQ代理中。我需要处理错误,所以我配置了一个带有DLQ的回复地址,以及一个带有RabbitTemplate的SimpleMessageListenerContainer,以将其标记为"侦听器"并能够使用replyQueue。

它适用于"硬编码"豆子:

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setReceiveTimeout(0);
template.setReplyTimeout(10000);
template.setExchange("inputExchange");
template.setRoutingKey("routing.1");
template.setReplyAddress("replyQueue1");
Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
DefaultClassMapper classMapper = new DefaultClassMapper();
classMapper.setDefaultType(Event.class);
messageConverter.setClassMapper(classMapper);
template.setMessageConverter(messageConverter);
return template;
}
@Bean
public SimpleMessageListenerContainer replyListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("replyQueue1");
container.setMessageListener(rabbitTemplate(connectionFactory));
return container;
}

但是这个网关的目标是完全可配置的,所以不要把每条路由都编码到兔子交换/队列。

例如,我在 yaml 中有此配置:

routes:
service1:
exchange: inputExchange
queue: inputQueue1
routing: routing.1
replyQueue: replyQueue1
dlExchange: reply.dlx1
dlQueue: dlx.queue1.reply
receiveTimeout: 0
replyTimeout: 10000
preProcessors: package.processor.LowercaseProcessor
postProcessors: package.processor.UppercaseProcessor
service2:
exchange: inputExchange
queue: inputQueue2
routing: routing.2

所以我需要动态创建我的RabbitTemplate和SimpleMessageListenerContainer,为每个服务配置replyQueue,DLQ,...

我尝试使用此代码:

@Configuration
public class RabbitTemplatesConfiguration implements BeanFactoryAware {
@Autowired
private GatewayProperties properties;
@Autowired
private ConnectionFactory connectionFactory;
private BeanFactory beanFactory;
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
}
@PostConstruct
public void configure() {
Assert.state(beanFactory instanceof ConfigurableBeanFactory, "wrong bean factory type");
ConfigurableBeanFactory configurableBeanFactory = (ConfigurableBeanFactory) beanFactory;
Map<String, ServiceProperties> routes = properties.getRoutes();
if (routes != null) {
for (String service : routes.keySet()) {
ServiceProperties props = routes.get(service);
createTemplate(configurableBeanFactory, service, props);
}
}
}
private void createTemplate(ConfigurableBeanFactory configurableBeanFactory, String service, ServiceProperties props) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setExchange(props.getExchange());
template.setRoutingKey(props.getRouting());
template.setReplyAddress(props.getReplyQueue());
template.setReceiveTimeout(props.getReceiveTimeout());
template.setReplyTimeout(props.getReplyTimeout());
Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
DefaultClassMapper classMapper = new DefaultClassMapper();
classMapper.setDefaultType(Event.class);
messageConverter.setClassMapper(classMapper);
template.setMessageConverter(messageConverter);
configurableBeanFactory.registerSingleton(service + "Template", template);
if(!StringUtils.isEmpty(props.getReplyQueue())) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames(props.getReplyQueue());
container.setMessageListener(new MessageListenerAdapter(template));
configurableBeanFactory.registerSingleton(service + "ListenerContainer", container);
container.afterPropertiesSet(); //added this but not working either
container.start(); //added this but not working either
}
}
}

但是当我在回复队列上收到响应时,我有这个错误:

java.lang.IllegalStateException: RabbitTemplate is not configured as MessageListener - cannot use a 'replyAddress': replyQueue1
at org.springframework.util.Assert.state(Assert.java:70)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveWithFixed(RabbitTemplate.java:1312)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceive(RabbitTemplate.java:1251)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceiveRaw(RabbitTemplate.java:1218)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive(RabbitTemplate.java:1189)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive(RabbitTemplate.java:1156)

因此,SimpleMessageListenerContainer似乎没有正确实例化/配置。

你知道问题出在哪里吗?

我的代码发送和接收:

@Autowired
private ApplicationContext context;
@Autowired
private RabbitAdmin rabbitAdmin;
@Autowired
private GatewayProperties properties;
@PostMapping("/{service}")
public ResponseEntity<Object> call(@PathVariable("service") String service, @RequestBody Event body) {
ServiceProperties serviceProperties = properties.getRoutes().get(service);
Queue queue = QueueBuilder.durable(serviceProperties.getQueue()).build();
rabbitAdmin.declareQueue(queue);
TopicExchange exchange = new TopicExchange(serviceProperties.getExchange());
rabbitAdmin.declareExchange(exchange);
rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(serviceProperties.getRouting()));
Queue replyQueue = null;
if (!StringUtils.isEmpty(serviceProperties.getReplyQueue())) {
replyQueue = QueueBuilder.durable(serviceProperties.getReplyQueue()).withArgument("x-dead-letter-exchange", serviceProperties.getDlExchange()).build();
rabbitAdmin.declareQueue(replyQueue);
Queue dlQueue = QueueBuilder.durable(serviceProperties.getDlQueue()).build();
rabbitAdmin.declareQueue(dlQueue);
TopicExchange dlqExchange = new TopicExchange(serviceProperties.getDlExchange());
rabbitAdmin.declareExchange(dlqExchange);
rabbitAdmin.declareBinding(BindingBuilder.bind(dlQueue).to(dlqExchange).with(serviceProperties.getReplyQueue()));
}
RabbitTemplate template = (RabbitTemplate) context.getBean(service + "Template");
Event outputMessage = (Event) template.convertSendAndReceive(serviceProperties.getExchange(), serviceProperties.getRouting(), body, new CorrelationData(UUID.randomUUID().toString()));
//...
}

不清楚为什么要使用回复队列;RabbitMQ 现在提供了一种直接回复机制,该机制消除了使用固定回复队列的大部分原因(一个例外是如果你想要一个 HA 回复队列)。

也就是说,问题是您将模板包装在MessageListenerAdapter中 - 这不是必需的(无论如何都不会起作用) - 模板实现MessageListener.

相关内容

  • 没有找到相关文章

最新更新