我正在使用 Spring 集成 jms 通道来使用队列中的消息并对其进行处理。
这是我的入站通道配置.xml
<jms:message-driven-channel-adapter id="jmsIn"
destination="requestQueue"
channel="routingChannel"
connection-factory="cachingConnectionFactory"
error-channel="errorChannel"
concurrent-consumers="${jms_adapter_concurrent_consumers}" />
在这里,当我将并发消费者设置为大于 1 的值时,我使用的消息在处理时会损坏。我正在使用队列中的 XML 和 Json 消息,在解析数据时,我可以看到它的某些内容已更改并设置为某个随机值。
上述配置仅在并发使用者值设置为 1 时才能正常工作。
我的问题是,当我将并发消费者设置为大于 1 的值时,我是否必须手动同步(使线程安全(我的代码?
是的,您的代码必须是线程安全的。任何多线程代码都是如此。
但是,同步整个事情将有效地击败并发性。最好使用无状态代码(无字段(,或使用线程安全变量(AtomicInteger
和朋友(,或将同步限制为小块。
如果同步整个侦听器代码,则一次只能处理一个容器线程。
Vishal和我一起工作。
我必须提到正在使用缓存连接工厂,我注意到在这篇文章中您不鼓励使用它。
<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="MQConnectionFactory" />
<property name="sessionCacheSize" value="10"/>
</bean>
@Bean(name="MQConnectionFactory")
public ConnectionFactory connectionFactory() {
if (factory == null) {
factory = new MQConnectionFactory();
try {
factory.setHostName(env.getRequiredProperty(HOST));
factory.setPort(Integer.parseInt(env.getRequiredProperty(PORT)));
factory.setQueueManager(env.getRequiredProperty(QUEUE_MANAGER));
factory.setChannel(env.getRequiredProperty(CHANNEL));
factory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
} catch (JMSException e) {
throw new RuntimeException(e);
}
}
return factory;
}
这会导致问题吗?到目前为止,问题似乎发生在 Spring 集成的默认消息转换器中,因为在某些情况下,有效负载的一部分是"。
干杯 克里斯