使用 Spring Boot RabbitMQ 的异步 RPC



我已经使用 spring boot 1.4 和 rabbit mq 实现了基本的异步 RPC 调用。
我打算用这个例子作为沟通的基础 在微服务中。
例如,发布服务器.java和订阅者.java可以是两个相互通信的微服务。

显示的代码工作正常,但我很想知道是否有更好的方法 这样做?

我的查询如下:

  • 为了让订阅者使用@RabbitListener注释监听请求队列,我不必在配置中声明directExchange()binding()bean。
    但是为了让asyncRabbitTemplate从回复队列中读取响应,我必须在配置中声明directExchange()binding()bean。
    有什么方法可以避免它,因为我觉得这是我两次声明这些豆子的代码重复。
  • 在实际应用中,微服务之间会有很多这样的调用。根据我的理解,我需要为每个请求-回复调用声明类似的rpcReplyMessageListenerContainer()asyncRabbitTemplate()
    这是对的吗?

代码如下。 链接到 Github

配置.java

@Configuration("asyncRPCConfig")
@Profile("async_rpc")
@EnableScheduling
@EnableRabbit
@ComponentScan(basePackages = {"in.rabbitmq.async_rpc"})
public class Config {
@Value("${queue.reply}")
private String replyQueue;
@Value("${exchange.direct}")
private String directExchange;
@Value("${routingKey.reply}")
private String replyRoutingKey;
@Bean
public Publisher publisher() {
return new Publisher();
}
@Bean
public SimpleRabbitListenerContainerFactory simpleMessageListenerContainerFactory(ConnectionFactory connectionFactory,
                SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jsonMessageConverter());
return template;
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public Queue replyQueueRPC() {
return new Queue(replyQueue);
}
@Bean
public SimpleMessageListenerContainer rpcReplyMessageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
simpleMessageListenerContainer.setQueues(replyQueueRPC());
simpleMessageListenerContainer.setReceiveTimeout(2000);
simpleMessageListenerContainer.setTaskExecutor(Executors.newCachedThreadPool());
return simpleMessageListenerContainer;
}

@Bean
public AsyncRabbitTemplate asyncRabbitTemplate(ConnectionFactory connectionFactory) {
return new AsyncRabbitTemplate(rabbitTemplate(connectionFactory),
rpcReplyMessageListenerContainer(connectionFactory),
directExchange + "/" + replyRoutingKey);
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange(directExchange);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(replyQueueRPC()).to(directExchange()).with(replyRoutingKey);
}

@Bean
public Subscriber subscriber() {
return new Subscriber();
}
}

出版社.java

public class Publisher {
@Value("${routingKey.request}")
private String requestRoutingKey;
@Autowired
private DirectExchange directExchange;
private static SecureRandom SECURE_RANDOM;
static {
try {
SECURE_RANDOM = SecureRandom.getInstanceStrong();
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
}
}

@Autowired
private AsyncRabbitTemplate asyncRabbitTemplate;

@Scheduled(fixedDelay = 100 * 1)
public void publishToDirectExchangeRPCStyle() {
Integer integer = SECURE_RANDOM.nextInt();
SampleRequestMessage sampleRequestMessage = new SampleRequestMessage(String.valueOf(integer));
System.out.println("Sending out message on direct directExchange:" + sampleRequestMessage);
AsyncRabbitTemplate.RabbitConverterFuture<SampleResponseMessage> sampleResponseMessageRabbitConverterFuture = asyncRabbitTemplate
.convertSendAndReceive(directExchange.getName(), requestRoutingKey, sampleRequestMessage);
sampleResponseMessageRabbitConverterFuture.addCallback(
sampleResponseMessage ->
System.out.println("Response for request message:" + sampleRequestMessage + " is:" + sampleResponseMessage)
, failure ->
System.out.println(failure.getMessage())
);
}
}

订户.java

public class Subscriber {
@RabbitHandler
@RabbitListener(
bindings = {
@QueueBinding(value = @Queue("${queue.request}"),
key = "${routingKey.request}",
exchange = @Exchange(value = "${exchange.direct}", type = ExchangeTypes.DIRECT, durable = "true"))})
public SampleResponseMessage subscribeToRequestQueue(@Payload SampleRequestMessage sampleRequestMessage, Message message) {
System.out.println("Received message :" + message);
return new SampleResponseMessage(sampleRequestMessage.getMessage());
}
}

你的解决方案很好。

不清楚你在问什么...

我必须在配置中声明 directExchange() 和 binding() bean。 有什么方法可以避免它,因为我觉得这是我两次声明这些豆子的代码重复。

@QueueBinding只是对@RabbitListener的方便,也是将队列、交换和绑定声明为@Beans 的替代方法。

如果您使用的是通用@Config类,则可以简单地省略侦听器上的bindings属性,并使用queues = "${queue.reply}"来避免重复。

我需要为每个请求-回复调用声明类似的 rpcReplyMessageListenerContainer() 和 asyncRabbitTemplate()。 这是对的吗?

是的;尽管在即将发布的 2.0 版本中,您可以使用一个DirectReplyToMessageListenerContainer,以避免在发送消息时为每个服务使用单独的回复队列。

请参阅此处和此处的文档。

从版本 2.0 开始,异步模板现在支持直接回复,而不是配置的回复队列。

(应改为"作为......"的替代,而不是"代替")。

因此,您可以使用同一模板与多个服务通信。

最新更新