使用ActiveMQ并行处理多条消息



我想使用简单的Processor/AsyncProcessor作为目标并行处理队列中的消息。处理器处理每条消息需要一点时间,但每条消息都可以单独处理,因此可以同时处理(在健康的范围内)。

我很难找到示例,尤其是关于骆驼路线的xml配置。

到目前为止,我已经定义了一个线程池、路由和处理器:

<threadPool id="smallPool" threadName="MyProcessorThread" poolSize="5" maxPoolSize="50" maxQueueSize="100"/>
<route>
    <from uri="broker:queue:inbox" />
    <threads executorServiceRef="smallPool">
        <to uri="MyProcessor" />
    </threads>
</route>
<bean id="MyProcessor" class="com.example.java.MyProcessor" />

我的处理器看起来像:

public class MyProcessor implements Processor {
    @Override
    public void process(Exchange exchange) throws Exception {
        Message in = exchange.getIn();
        String msg = in.getBody(String.class);      
        System.out.println(msg);
        try {
            Thread.sleep(10 * 1000); // Do something in the background
        } catch (InterruptedException e) {}
        System.out.println("Done!");
    }
}

不幸的是,当我将消息发布到队列时,它们仍然被逐一处理,每个消息延迟10秒(我的"后台任务")。

有人能为我指明正确的方向,让我使用定义的线程池处理消息吗?或者解释我做错了什么?

您应该使用评论中所说的concurrentConsumers选项,

<route>
    <from uri="broker:queue:inbox?concurrentConsumers=5" />
    <to uri="MyProcessor" />
</route>

请注意,还有maxConcurrentConsumers,您可以设置为使用最小/最大范围的并发消费者,因此Camel将根据负载自动增长/收缩。

请参阅的JMS文档中的更多详细信息

  • http://camel.apache.org/jms

最新更新