我们为特定用例实现了一个分布式请求/响应类型架构,我们想等待响应。我们使用的JMS经纪人是ActiveMQ,并且使用Spring连接代码。
我们看到的问题是,如果将一堆请求发送到同一目的地,则任何请求,例如,任何请求都需要大量时间来完成,请阻止跟随它的请求消息。消费者使用的SessionAwareMessageListener接口仅支持OnMessage((方法。在这里实现并行性的最佳方法是什么,即如果特定请求需要很长时间,则不应阻止队列中的其他消息?
有这篇文章,但没有回答我的问题。JMS:我们可以从onMessage((withtout commit或滚动
中从队列中获取多个消息谢谢
代码的相关片段(为简短删除了例外处理等(
生产者
public class MyJmsProducer {
private ProcessingResponse sendMessage(final Serializable serializable) {
//send JMS request and wait for response
return jmsMessagingTemplate.convertSendAndReceive(destination, serializable, ProcessingResponse.class); //this operation seems to be blocking + sync
}
}
和听众(消费者(
public class MyJmsListener
implements SessionAwareMessageListener<Message>, NotificationHandler<Task> {
@Override
public void onMessage(Message message, Session session)
throws JMSException {
ProcessingRequest processingRequest = (ProcessingRequest) ((ObjectMessage) message).getObject();
// handle the request here (THIS COULD TAKE A WHILE)
handleRequest(processingRequest);
// done handling the request, now create a response message
final ObjectMessage responseMessage = new ActiveMQObjectMessage();
responseMessage.setJMSCorrelationID(message.getJMSCorrelationID());
responseMessage.setObject(processingResponse);
// Message sent back to the replyTo address of the income message.
final MessageProducer producer = session.createProducer(message.getJMSReplyTo());
producer.send(responseMessage);
}
}
您可以使用DMLC的ConcurrentConsumers
提高消息的消耗速度并解决缓慢的消费者问题:
@Bean
public DefaultMessageListenerContainer dmlc() {
DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
dmlc.setMaxConcurrentConsumers(10);
dmlc.setConcurrentConsumers(5);
return dmlc;
}
您需要调整预摘要对并发消费者:
persistent queues (default value: 1000)
non-persistent queues (default value: 1000)
persistent topics (default value: 100)
non-persistent topics (default value: Short.MAX_VALUE - 1)
所有消息都被派发到第一个连接的消费者,当另一个消息连接到同一目的地时,他没有接收消息,因此要更改此行为,您需要将Prefetchpolicy设置为比默认值更低的值。例如,将此jms.prefetchPolicy.queuePrefetch=1
添加到Activemq.xml中的URI Config或在客户端URL上设置
@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1");
return connectionFactory;
}
建议使用高性能的高预取值 消息量。但是,对于较低的消息量,每个消息 消息需要很长时间才能处理,预摘要应设置为1。 这样可以确保消费者一次仅处理一条消息。 但是,指定零的预取限制将导致消费者 要调查消息一次,而不是消息是 推到消费者。
看看http://activemq.apache.org/what-is-the-prefetch-limit-for.html
和
http://activemq.apache.org/destination-options.html