RabbitMQ -Java Spring-如何启动几个队列



我很难找到一种弹簧方式来初始交换,该方式将传入消息发送到更多的1个队列 - 在我的弹簧启动应用程序上:

我找不到定义秒交换标题绑定的好方法。

我正在使用Rabbittemplate作为生产者客户端。

RabbitMQ 6页教程对此并没有真正的帮助,因为:

  1. 最初是消费者按需的唯一临时排队(而我需要制作人进行绑定 - 持续队列)
  2. 这些示例用于基本的Java使用 - 不使用弹簧功能。

我也未能找到如何通过春季AMQP页面实现它。

我到目前为止所得到的,试图将基本的Java绑定到弹簧的方式上 - 但是它不起作用....

@Bean
public ConnectionFactory connectionFactory() throws IOException {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    Connection conn = connectionFactory.createConnection();
    Channel channel = conn.createChannel(false);
    channel.exchangeDeclare(SPRING_BOOT_EXCHANGE, "fanout");
    channel.queueBind(queueName, SPRING_BOOT_EXCHANGE, ""); //first bind
    channel.queueBind(queueName2, SPRING_BOOT_EXCHANGE, "");// second bind
    return connectionFactory;
}

任何帮助将不胜感激

编辑

i 思考问题出现的是,每次我重新启动服务器时,它都试图重新定义Exchange-Query-tabining - 而它们持续在经纪人中...我设法通过经纪人UI控制台手动定义它们 - 因此,生产者只知道交换名称,消费者只知道其相关队列。有没有一种方法可以在先进的情况下定义这些元素 - 但是,如果以前的重新启动已经存在,则不会重新定义覆盖?

我们使用类似于以下方法将数据从一个特定输入通道发送到其他消费者的多个输入队列的方法:

@Bean
public IntegrationFlow integrationFlow(final RabbitTemplate rabbitTemplate, final AmqpHeaderMapper amqpHeaderMapper) {
    IntegrationFlows
        .from("some-input-channel")
        .handle(Amqp.outboundAdapter(rabbitTemplate)
        .headerMapper(headerMapper))
        .get()    
}
@Bean
public AmqpHeaderMapper amqpHeaderMapper() {
    final DefaultAmqpHeaderMapper headerMapper = new DefaultAmqpHeaderMapper();
    headerMapper.setRequestHeaderNames("*");
    return headerMapper;
}
@Bean
public ConnectionFactory rabbitConnectionFactory() {
   return new CachingConnectionFactory();
}
@Bean
public RabbitAdmin rabbitAdmin(final ConnectionFactory rabbitConnectionFactory) {
    final RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory);
    rabbitAdmin.afterPropertiesSet();
    return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate(final ConnectionFactory rabbitConnectionFactory, final RabbitAdmin rabbitAdmin) {
    final RabbitTemplate rabbitTemplate = new RabbitTemplate();
    rabbitTemplate.setConnectionFactory(connectionFactory);
    final FanoutExchange fanoutExchange = new FanoutExchange(MY_FANOUT.getFanoutName());
    fanoutExchange.setAdminsThatShouldDeclare(rabbitAdmin);
    for (final String queueName : MY_FANOUT.getQueueNames) {
        final Queue queue = new Queue(queueName, true);
        queue.setAdminsThatShouldDeclare(rabbitAdmin);
        final Binding binding = BindingBuilder.bind(queue).to(fanoutExchange);
        binding.setAdminsThatShouldDeclare(rabbitAdmin);
    }
    rabbitTemplate.setExchange(fanoutExchange);    
}

和完整性这是粉丝声明的枚举:

public enum MyFanout {
    MY_FANOUT(Lists.newArrayList("queue1", "queue2"), "my-fanout"),
    private final List<String> queueNames;
    private final String fanoutName;
    MyFanout(final List<String> queueNames, final String fanoutName) {
        this.queueNames = requireNonNull(queueNames, "queue must not be null!");
        this.fanoutName = requireNonNull(fanoutName, "exchange must not be null!");
    }
    public List<String> getQueueNames() {
        return this.queueNames;
    }
    public String getFanoutName() {
        return this.fanoutName;
    }
}

希望它有帮助!

谢谢!那是我想要的答案。另外 - 为了完整性 - 我在弹簧豆内找到了一种"爪哇方式"的方法:

  @Bean
public ConnectionFactory connectionFactory() throws IOException {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    Connection conn = connectionFactory.createConnection();
    Channel channel = conn.createChannel(false);
    // declare exchnage
    AMQP.Exchange.DeclareOk resEx = channel.exchangeDeclare(AmqpTemp.SPRING_BOOT_EXCHANGE_test, ExchangeTypes.FANOUT, true, false, false, null);
    // declares queues
    AMQP.Queue.DeclareOk resQ = channel.queueDeclare(AmqpTemp.Q2, true, false, false, null);
    resQ = channel.queueDeclare(AmqpTemp.Q3, true, false, false, null);
    // declare binding
    AMQP.Queue.BindOk resB = channel.queueBind(AmqpTemp.Q2, AmqpTemp.SPRING_BOOT_EXCHANGE_test, "");
    resB = channel.queueBind(AmqpTemp.Q3, AmqpTemp.SPRING_BOOT_EXCHANGE_test, "");
    // channel.queueBind(queueName2, SPRING_BOOT_EXCHANGE, "");
    return connectionFactory;
}

我以前遇到的问题是对我在最初使用代码中创建了一些队列的事实 - 当我试图重复使用相同的队列名称时,它引起了例外,因为它们最初的定义不同 - 所以 - 所以 - 学习的教训:从您使用代码"播放"时使用的名称重命名了队列。

最新更新