以下类包含在几个消费者应用程序中:
@Component
@Configuration
public class HealthListener {
public static final String HEALTH_CHECK_QUEUE_NAME = "healthCheckQueue";
public static final String HEALTH_CHECK_FANOUT_EXCHANGE_NAME = "health-check-fanout";
@Bean
public Binding healthListenerBinding(
@Qualifier("healthCheckQueue") Queue queue,
@Qualifier("instanceFanoutExchange") FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}
@Bean
public FanoutExchange instanceFanoutExchange() {
return new FanoutExchange(HEALTH_CHECK_FANOUT_EXCHANGE_NAME, true, false);
}
@Bean
public Queue healthCheckQueue() {
return new Queue(HEALTH_CHECK_QUEUE_NAME);
}
@RabbitListener(queues = HEALTH_CHECK_QUEUE_NAME)
public String healthCheck() {
return "some result";
}
}
我正在尝试向扇出交换发送消息,并接收所有回复,以了解哪些使用者正在运行。
我可以发送一条消息并得到这样的第一个回复:
@Autowired
RabbitTemplate template;
// ...
String firstReply = template.convertSendAndReceiveAsType("health-check-fanout", "", "", ParameterizedTypeReference.forType(String.class));
但是,我需要对这条消息感到最满意,而不仅仅是第一条。我需要设置一个回复侦听器,但我不确定如何设置。
(convertS|s)endAndReceive.*()
方法不是为处理多个回复而设计的;它们严格来说是一个请求/一个回复方法。
您需要使用 (convertAndS|s)end()
方法来发送请求,并实现自己的回复机制,也许使用侦听器容器来回复,以及一些组件来聚合回复。
你可以使用类似 Spring Integration Aggregator 的东西,但你需要一些机制(ReleaseStrategy
)来知道何时收到所有预期的回复。
或者,您可以简单地接收离散回复并单独处理它们。
编辑
@SpringBootApplication
public class So54207780Application {
public static void main(String[] args) {
SpringApplication.run(So54207780Application.class, args);
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> template.convertAndSend("fanout", "", "foo", m -> {
m.getMessageProperties().setReplyTo("replies");
return m;
});
}
@RabbitListener(queues = "queue1")
public String listen1(String in) {
return in.toUpperCase();
}
@RabbitListener(queues = "queue2")
public String listen2(String in) {
return in + in;
}
@RabbitListener(queues = "replies")
public void replyHandler(String reply) {
System.out.println(reply);
}
@Bean
public FanoutExchange fanout() {
return new FanoutExchange("fanout");
}
@Bean
public Queue queue1() {
return new Queue("queue1");
}
@Bean
public Binding binding1() {
return BindingBuilder.bind(queue1()).to(fanout());
}
@Bean
public Queue queue2() {
return new Queue("queue2");
}
@Bean
public Binding binding2() {
return BindingBuilder.bind(queue2()).to(fanout());
}
@Bean
public Queue replies() {
return new Queue("replies");
}
}
和
FOO
foofoo