使用spring jmstemplate / activemq与请求 /响应一起使用,而无需阻止队列中的其他消息



我们为特定用例实现了一个分布式请求/响应类型架构,我们想等待响应。我们使用的JMS经纪人是ActiveMQ,并且使用Spring连接代码。

我们看到的问题是,如果将一堆请求发送到同一目的地,则任何请求,例如,任何请求都需要大量时间来完成,请阻止跟随它的请求消息。消费者使用的SessionAwareMessageListener接口仅支持OnMessage((方法。在这里实现并行性的最佳方法是什么,即如果特定请求需要很长时间,则不应阻止队列中的其他消息?

有这篇文章,但没有回答我的问题。JMS:我们可以从onMessage((withtout commit或滚动

中从队列中获取多个消息

谢谢

代码的相关片段(为简短删除了例外处理等(

生产者

public class MyJmsProducer {
private ProcessingResponse sendMessage(final Serializable serializable) {
    //send JMS request and wait for response
    return jmsMessagingTemplate.convertSendAndReceive(destination, serializable, ProcessingResponse.class); //this operation seems to be blocking + sync
   }
}

和听众(消费者(

public class MyJmsListener
    implements SessionAwareMessageListener<Message>, NotificationHandler<Task> {
@Override
public void onMessage(Message message, Session session)
        throws JMSException {
    ProcessingRequest processingRequest = (ProcessingRequest) ((ObjectMessage) message).getObject();
    // handle the request here (THIS COULD TAKE A WHILE)
    handleRequest(processingRequest);

    // done handling the request, now create a response message
    final ObjectMessage responseMessage = new ActiveMQObjectMessage();
    responseMessage.setJMSCorrelationID(message.getJMSCorrelationID());
    responseMessage.setObject(processingResponse);
    // Message sent back to the replyTo address of the income message.
    final MessageProducer producer = session.createProducer(message.getJMSReplyTo());
    producer.send(responseMessage);
  }
}

您可以使用DMLC的ConcurrentConsumers提高消息的消耗速度并解决缓慢的消费者问题:

@Bean
public DefaultMessageListenerContainer dmlc() {
    DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
    dmlc.setMaxConcurrentConsumers(10);
    dmlc.setConcurrentConsumers(5);
    return dmlc;
}

您需要调整预摘要对并发消费者:

persistent queues (default value: 1000)
non-persistent queues (default value: 1000)
persistent topics (default value: 100)
non-persistent topics (default value: Short.MAX_VALUE - 1)

所有消息都被派发到第一个连接的消费者,当另一个消息连接到同一目的地时,他没有接收消息,因此要更改此行为,您需要将Prefetchpolicy设置为比默认值更低的值。例如,将此jms.prefetchPolicy.queuePrefetch=1添加到Activemq.xml中的URI Config或在客户端URL上设置

@Bean
public ConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
            "tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1");
    return connectionFactory;
}

建议使用高性能的高预取值 消息量。但是,对于较低的消息量,每个消息 消息需要很长时间才能处理,预摘要应设置为1。 这样可以确保消费者一次仅处理一条消息。 但是,指定零的预取限制将导致消费者 要调查消息一次,而不是消息是 推到消费者。

看看http://activemq.apache.org/what-is-the-prefetch-limit-for.html

http://activemq.apache.org/destination-options.html

最新更新