如何提高spring引导和MQ应用程序的消息处理速度?



获得了一个小型的spring-boot消息传递应用程序,该应用程序接收来自队列的消息并在DB2表中插入/更新一行。注意到这周它收到了很多消息,但是消耗非常慢,消息填充磁盘(infra抱怨它)。我们如何才能更快地提高从队列读取消息的速度?

JMS配置

...
...
@Bean
public MQXAConnectionFactory mqxaQueueConnectionFactory() {
MQXAConnectionFactory mqxaConnectionFactory = new MQXAConnectionFactory();
log.info("Host: {}", host);
log.info("Port: {}", port);
log.info("Channel: {}", channel);
log.info("Timeout: {}", receiveTimeout);
try {
mqxaConnectionFactory.setHostName(host);
mqxaConnectionFactory.setPort(port);
mqxaConnectionFactory.setQueueManager(queueManager);
if (channel != null && !channel.trim().isEmpty()) {
mqxaConnectionFactory.setChannel(channel);
}
mqxaConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
} catch (JMSException e) {
throw new RuntimeException(e);
}
return mqxaConnectionFactory;
}
@Bean
public CachingConnectionFactory cachingConnectionFactory(MQXAConnectionFactory mqxaConnectionFactory) {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setTargetConnectionFactory(mqxaConnectionFactory);
cachingConnectionFactory.setSessionCacheSize(this.sessionCacheSize);
cachingConnectionFactory.setCacheConsumers(this.cacheConsumers);
cachingConnectionFactory.setReconnectOnException(true);
return cachingConnectionFactory;
}
@Bean
@Primary
public SingleConnectionFactory singleConnectionFactory(MQXAConnectionFactory mqxaConnectionFactory) {
SingleConnectionFactory singleConnectionFactory = new SingleConnectionFactory(mqxaConnectionFactory);
singleConnectionFactory.setTargetConnectionFactory(mqxaConnectionFactory);
singleConnectionFactory.setReconnectOnException(true);
return singleConnectionFactory;
}
@Bean
public PlatformTransactionManager platformTransactionManager(CachingConnectionFactory cachingConnectionFactory) {
return new JmsTransactionManager(cachingConnectionFactory);
}
@Bean
public JmsOperations jmsOperations(CachingConnectionFactory cachingConnectionFactory) {
JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory);
jmsTemplate.setReceiveTimeout(receiveTimeout);
return jmsTemplate;
}
@Bean
public DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory(PlatformTransactionManager transactionManager,
@Qualifier("singleConnectionFactory") SingleConnectionFactory singleConnectionFactory) {
EnhancedJmsListenerContainerFactory factory = new EnhancedJmsListenerContainerFactory();
factory.setConnectionFactory(singleConnectionFactory);
factory.setTransactionManager(transactionManager);
factory.setConcurrency(concurrency);
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
factory.setSessionTransacted(true);
factory.setMaxMessagesPerTask(this.messagesPerTask);
factory.setIdleTaskExecutionLimit(this.idleTaskExecutionLimit);
return factory;
}
...
...

我们从CachingConnectionFactory转移到SingleConnectionFactory,因为它打开了太多队列连接。

队列监听器

@Transactional
@Slf4j
@Component
@Profile("!thdtest")
public class QueueListener {
private CatalogStatusDataProcessor catalogStatusDataProcessor;
private HostToDestDataAckProcessor hostToDestDataAckProcessor;
@Autowired
public QueueListener(CatalogStatusDataProcessor catalogStatusDataProcessor, HostToDestDataAckProcessor hostToDestDataAckProcessor) {
this.catalogStatusDataProcessor = catalogStatusDataProcessor;
this.hostToDestDataAckProcessor = hostToDestDataAckProcessor;
}
@JmsListener(destination = "${project.mq.queue}", containerFactory = "defaultJmsListenerContainerFactory")
public void onMessage(MAOMessage message) throws Exception {
String messageString = message.getStringMessagePayload();
try {
if (log.isDebugEnabled())
log.debug("Message received = "+messageString);
StopWatch sw = new StopWatch("Received message");
sw.start();
Object obj= XMLGenerator.generateTOfromXML(messageString);
if(obj instanceof ResponseTO){
catalogStatusDataProcessor.processCatalogStatusInfo((ResponseTO)obj);
}
else if(obj instanceof HostToStoreDataAckWrapper){
hostToDestDataAckProcessor.processHostToDestDataACK(messageString);
}
sw.stop();
log.info("Message is processed in = "+sw.getTotalTimeSeconds() +" seconds");
} catch (Exception e) {
log.error("Exception in processing message: {}", e);
throw e;
}
}
}

尝试将并发设置从2-4更改为4-6,但没有多大改善。使用弹簧引导1.5.4。RELEASE, JdK8, javax。jms 2.0.1, MQ allclient 9.0

您怎么知道这是MQ问题呢?根据我的经验(MQ性能),大多数性能问题都是由于其他处理造成的MQGET(快)数据库更新(慢)..提交。

我会把一些代码放在不同的组件周围,并对它们进行计时。例如

time1MQGET得到time2

计算时间2 -时间1如果>10 ms报告此(或将其添加到全局计数器)

做数据库工作获得时间3计算时间3 -时间2

If δ>10毫秒报告这个(或将其添加到全局计数器)。

如果没有报告任何问题,将时间从10ms降为2ms。

如果这没有报告任何问题,请尝试添加程序的其他实例,因为可能是工作的速度比一个线程可以处理它的速度快。

我也看到减少线程有帮助!当他们执行数据库插入/更新操作时,这将导致争用,并且所有线程都在等待持有锁的线程——您可以从长数据库时间中看到这一点。

作为第一步,您可以打开一些MQ跟踪以报告MQ调用所需的时间....但是添加我提到的代码是很好的实践-特别是如果他们的问题是在其他地方。

最新更新