如何为 RabbitMQ 和 Spring 动态注册一个队列及其独占的消费者/监听器?



我有以下问题要解决:我正在尝试使用RabbitMQ消息实现简单的延迟重试机制。我正在拥有一个基础架构,这为我提供了传递消息的延迟。我可以有许多有兴趣的参与者,他们想在运行时利用这种延迟的重试机制。

参与者只想向我提供2个详细信息和信息: 1.他们希望延迟说t秒后传递消息的队列名称。 2.队列的消费者(例如消息的消费者。)

我试图做以下操作:

private void startSeparateListener(final Object messageConsumer, 
                                   final Queue queue) {
        SimpleMessageListenerContainer simpleMessageListenerContainer 
                    = myCustomeSimpleMessageListenerFactory.create();
        simpleMessageListenerContainer.setRabbitAdmin(rabbitAdmin);
        simpleMessageListenerContainer.setQueues(queue);
        simpleMessageListenerContainer.setMessageListener(new MessageListenerAdapter(messageConsumer));
        simpleMessageListenerContainer.start();
    }

请注意,队列已经在Rabbitadmin中创建和注册,并且对象消费者具有一种名为handlemessage的方法来收听队列。

这是在运行时与消息消费者动态注册队列的正确方法?

注意:Spring已经提供了一种类型的bean SimpleMessageListEnerContainer,但会使用bean动态添加队列和消费者会导致say q1的意外消费者的问题被称为另一个队列的接收器的一部分,q2的内容类型可能是其内容类型的相同Q1?

我尝试了很多搜索,但无法掌握任何具体的解释。如果是重复的问题和任何天真的问题,请事先道歉。

我不能以某种方式编译您的问题,但是我可以说的是,RabbitMQ和SRING AMQP中已经有执行的交换解决方案很好地支持了它。

我建议远离动态添加的 SimpleMessageListenerContainer:看起来并不是看起来如此。有一个选项,例如addQueueNames()

/**
 * Add queue(s) to this container's list of queues. The existing consumers
 * will be cancelled after they have processed any pre-fetched messages and
 * new consumers will be created. The queue must exist to avoid problems when
 * restarting the consumers.
 * @param queueName The queue to add.
 */
@Override
public void addQueueNames(String... queueName) {

因此,您可以考虑不添加新容器,而是在现有的排队中添加新的队列。amqp_consumerQueue的下游路由可能有助于将消息与不同的队列区分开。

最新更新