我目前正在用Apache Camel和ActiveMQ实现一个Java消息传递系统。我的目标是根据消息具有的几个属性动态地设置消息的优先级。
我已经按照这里的说明配置了我的ActiveMQ。然后,我创建了以下发送TextMessage
的方法:
public void send(BaseMessage baseMessage, int jmsPriority) throws JsonProcessingException {
Map<String, Object> messageHeaders = new HashMap<>();
messageHeaders.put(MESSAGING_HEADER_JMS_PRIORITY, jmsPriority);
messageHeaders.put(MESSAGING_HEADER_TYPE, baseMessage.getClass().getSimpleName());
String payload = objectMapper.writeValueAsString(baseMessage);
producerTemplate.sendBodyAndHeaders(payload, messageHeaders);
}
发送消息完美工作,BaseMessage
的动态类型被正确设置为每个消息的报头。优先级也会设置,但会被忽略。输出消息的顺序仍然是FIFO,就像队列通常做的那样。
到目前为止,我还没有实现动态设置消息的优先级。我不想使用Apache Camel的Resequencer,因为我必须为"排序"创建几个新的队列。从我的角度来看,ActiveMQ必须能够对消息本身进行优先级排序和重新排序。
任何提示都是感激的。如果需要进一步的细节可以问我。
默认情况下,ActiveMQ禁用消息优先级。这很正常。当执行分布式消息传递——跨服务器发送消息时,优先级实际上并不有效,因为代理只能扫描队列中具有较高优先级的消息,然后才开始减慢该队列的所有流量。
优先级消息在嵌入代理并将其用于任务调度时可以很好地工作——其中队列深度通常不超过低几千。
更新:
提示——JMS中的QOS设置必须在MessageProducer对象上设置,而不是消息。
启用优先级消息