我正在尝试使用Spring Boot和ActiveMQ使用JmsTemplate发送具有不同优先级的消息,但它不起作用。
我试过这个:
MessageCreator mc = session -> {
TextMessage tm = session.createTextMessage("hello");
tm.setJMSPriority(6);
return tm;
};
jmsTemplate.send((Queue) () -> "box", mc);
ActiveMQ代理内部的优先级仍然是4(默认值(。
我发现实际更改消息优先级的唯一方法是在JmsTemplate级别更改优先级。
jmsTemplate.setPriority(3);
现在的问题是,在此之后发送的所有消息都将具有优先级3。我知道我可以在每次发送后重置JmsTemplate优先级,但它不是"干净的",那么并发呢?
如何使用@JmsListener设置每条消息的优先级并获得优先级最高的消息?
我刚刚遇到了同样的问题。
我测试了您关于设置jmsTemplate优先级的观点,您的假设是正确的。它没有用并发处理好。
我发现有效的解决方案(尽管不理想(是扩展JmsTemplate并重写doSend方法,将JmsPriority从消息复制到生产者。这并不理想,在spring-boot版本上扩展类make break(我已经在2.1.7上测试过了(,还有一些额外的步骤来注册新的JmsTemplate。但它确实有效,我已经在负载下进行了测试。
步骤。。。。。
创建一个扩展JmsTemplate的新类,覆盖doSend方法,从消息复制优先级
import java.io.Serializable;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import org.springframework.jms.core.JmsTemplate;
public class RcsJmsTemplate extends JmsTemplate implements Serializable {
public RcsJmsTemplate() {
}
public RcsJmsTemplate(ConnectionFactory connectionFactory) {
super(connectionFactory);
}
/**
* Actually send the given JMS message.
*
* AF: EXTENDED TO COPY THE PRIORITY FROM THE MESSAGE TO THE PRODUCER
*
* @param producer the JMS MessageProducer to send with
* @param message the JMS Message to send
* @throws JMSException if thrown by JMS API methods
*/
@Override
protected void doSend(MessageProducer producer, Message message) throws JMSException {
if (getDeliveryDelay() >= 0) {
producer.setDeliveryDelay(getDeliveryDelay());
}
producer.send(message, getDeliveryMode(), message.getJMSPriority(), getTimeToLive());
}
}
添加一个bean(到你的App.java或适当的配置类(你可能不需要通过消息转换器(我在我的项目中使用Jackson(你可能还需要将其他配置应用到新的JmsTemplate。
@Bean
public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
RcsJmsTemplate rcsJmsTemplate = new RcsJmsTemplate(connectionFactory);
rcsJmsTemplate.setMessageConverter(messageConverter);
return rcsJmsTemplate;
}
然后,如您的问题中所述,设置消息的JmsPriority属性。你使用的是MessageCreator,但在我的项目中,我使用的是的消息后处理
public void convertAndSendWithPriority(JmsTemplate jmsTemplate, String destination, Object message, int priority) {
jmsTemplate.convertAndSend(destination, message, (Message jmsMessage) -> {
jmsMessage.setJMSPriority(priority);
return jmsMessage;
});
}
为了完整起见,您应该添加属性:spring.jms.template.qos enabled=true
就是这样。希望有帮助(事实上,我希望有人能想出一个更好的答案(感谢