使用spring cloud stream 3.0+使用RabbitMQ队列中的消息



我有一个Producer,它使用直接交换在RabbitMQ队列中生成消息。

队列名称:TEMP_queue,交换机名称:TEMP_DIRECT_exchange

生成这个队列很容易,因为在我的生产者应用程序中,我使用了我熟悉的SpringAMQP。

在我的Consumer应用程序上,我需要使用Springcloudstream3.0+.版本

我希望避免使用像@EnableBinding@StreamListener这样的遗留注释,因为它们即将被取消缓存。

我的应用程序的遗留代码如下所示:

@EnableBinding(Bindings.class)
public class TempConsumer {
@StreamListener(target = "TEMP_QUEUE")
public void consumeFromTempQueue(MyObject object) {
// do stuff with the object
}
}
public interface Bindings {
@Input("TEMP_QUEUE")
SubscribableChannel myInputBinding();
}

从他们的文件中,我发现我可以做一些类似的事情

@Bean
public Consumer<MyObject> consumeFromTempQueue() {
return obj -> {
// do stuff with the object
};
}

我不清楚如何指定此bean将从TEMP_QUEUE消费?此外,如果我想从多个队列中消费,该怎么办?

请参阅从现有队列/交换机消耗。

您可以使用从多个队列消费

spring.cloud.stream.bindings.consumeFromTempQueue-in-0.destination=q1,q2,q3
spring.cloud.stream.bindings.consumer.multiplex=true

如果没有多路复用,您将获得3个绑定;使用多路复用,您将获得一个侦听器容器来侦听多个队列。

您需要使用application.yml来绑定您的bean。

spring.cloud.stream:
function.definition: consumeFromTempQueue

您也可以使用此配置来配置源、进程和接收器。在你的情况下,你只是在使用一个来源。

你可以阅读这篇文章了解更多信息。

相关内容

  • 没有找到相关文章

最新更新