我有一个Apache Camel路由和处理器从ActiveMQ代理消费。
路线代码 -
@Component
public class MyRoute extends RouteBuilder {
private String mySubscription;
private MyProcessor myProcessor;
public MyRoute(@Value("${my.topic}") String mySubscription, MyProcessor myProcessor) {
this.mySubscription = mySubscription;
this.myProcessor = myProcessor;
}
@Override
public void configure() {
from(mySubscription)
.unmarshal().json(JsonLibrary.Jackson, MyDTO.class)
.bean(myProcessor, "process(${body})")
.end();
}
}
处理器代码 -
@Slf4j
@Component
@AllArgsConstructor
public class MyProcessor {
public void process(MyDTO dto) {
//code that calls HTTP URLs
}
}
配置如下 -
spring:
application:
name: my_listener
//Bean prefixes
pooledConnectionFactory:
maxConnections: 10
connectionFactory:
brokerURL: ${brokerURL}
redeliveryPolicy:
backOffMultiplier: 2.0
useExponentialBackOff: true
redeliveryDelay: 60000
maximumRedeliveries: 5
component:
forceSendOriginalMessage: true
concurrentConsumers: 15
//A bunch of HTTP URLs
brokerURL: <brokerURL>
在本地运行侦听器并将 VisualVM 指向本地 ActiveMQ 代理时,我可以在 VisualVM 线程选项卡下看到 15 个名称包含订阅名称的线程。如果我发送了 4 条消息,我会看到 4 个不同的线程进入运行状态。我没有看到任何可识别为处理器类对象的线程,尽管 MBeans 选项卡中的处理器下有一个 bean。在该 bean 上调用 getTotalExchange(( 显示 4 = 发送的消息数。
concurrentConsumer设置(此处为 15(是否仅创建 15 个线程以供消费?通过处理器类的处理是否仍按顺序进行?还是每个订阅线程在其自己的线程中调用处理器对象,使处理器逻辑成为多线程?
concurrentConsumer设置(此处为15(是否仅创建15个线程用于消费?
安斯 - 是的,懒惰。您可以通过提供自己的线程池来控制这一点。
通过处理器类的处理是否仍按顺序进行?还是每个订阅线程在其自己的线程中调用处理器对象,使处理器逻辑成为多线程?
答 - 不,它并行发生,就像任何多线程应用程序一样,这意味着如果使用任何共享数据,则进程方法应该是线程安全的。