我想使用简单的Processor/AsyncProcessor作为目标并行处理队列中的消息。处理器处理每条消息需要一点时间,但每条消息都可以单独处理,因此可以同时处理(在健康的范围内)。
我很难找到示例,尤其是关于骆驼路线的xml配置。
到目前为止,我已经定义了一个线程池、路由和处理器:
<threadPool id="smallPool" threadName="MyProcessorThread" poolSize="5" maxPoolSize="50" maxQueueSize="100"/>
<route>
<from uri="broker:queue:inbox" />
<threads executorServiceRef="smallPool">
<to uri="MyProcessor" />
</threads>
</route>
<bean id="MyProcessor" class="com.example.java.MyProcessor" />
我的处理器看起来像:
public class MyProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
Message in = exchange.getIn();
String msg = in.getBody(String.class);
System.out.println(msg);
try {
Thread.sleep(10 * 1000); // Do something in the background
} catch (InterruptedException e) {}
System.out.println("Done!");
}
}
不幸的是,当我将消息发布到队列时,它们仍然被逐一处理,每个消息延迟10秒(我的"后台任务")。
有人能为我指明正确的方向,让我使用定义的线程池处理消息吗?或者解释我做错了什么?
您应该使用评论中所说的concurrentConsumers选项,
<route>
<from uri="broker:queue:inbox?concurrentConsumers=5" />
<to uri="MyProcessor" />
</route>
请注意,还有maxConcurrentConsumers
,您可以设置为使用最小/最大范围的并发消费者,因此Camel将根据负载自动增长/收缩。
请参阅的JMS文档中的更多详细信息
- http://camel.apache.org/jms