使用JMS API访问IBM MQ的BOTHRESH值



我使用带有mq-jms-spring-boot-starter的spring-boot来创建一个JMS Listener应用程序,该应用程序从队列中读取消息,对其进行处理并将消息转发到另一个队列。

在出现毒消息的情况下,我正在尝试生成警报。然而,为了避免对同一条消息生成多个警报,我考虑比较JMSXDeliveryCountBOTHRESH的值,并在发送到BOQ之前的最后一次重新交付中生成警报。

CCD_ 4和CCD_。

@JmsListener(destination = "${sourceQueue}")
public void processMessages(Message message) {
TextMessage msg = (TextMessage) message;
int boThresh;
int redeliveryCount;
try {
boThresh = message.getIntProperty("<WHAT COMES HERE>");
redeliveryCount = message.getIntProperty("JMSXDeliveryCount");
String processedMessage = this.processMessage(message);
this.forwardMessage("destinationQueue", processedMessage);
} catch (Exception e) {
if (redeliveryCount >= boThresh) {
//generate alert here
}
}
}

我应该如何在这里获得BOTHRESH的值?有可能吗?我尝试使用getPropertyNames()方法获取所有可用的属性,下面是我看到的所有属性。

  • JMS_IBM_Format
  • JMS_IBM_PutDate
  • JMS_IBM_Character_Set
  • JMSXDeliveryCount
  • JMS_IBM_MsgType
  • JMSXUserID
  • JMS_IBM_Encoding
  • JMS_IBM_PutTime
  • JMSXAppID
  • JMS_IBM_PutApplType

这样就可以了,但代码确实需要管理员访问管理通道,这可能不是客户端应用程序的最佳选择。

配置

import com.ibm.mq.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.ibm.mq.constants.CMQC;
import java.util.Hashtable;
@Configuration
public class MQConfiguration {
protected final Log logger = LogFactory.getLog(getClass());
@Value("${ibm.mq.queueManager:QM1}")
public String qMgrName;
@Value("${app.mq.admin.channel:DEV.ADMIN.SVRCONN}")
private String adminChannel;
@Value("${app.mq.host:localhost}")
private String host;
@Value("${app.mq.host.port:1414}")
private int port;
@Value("${app.mq.adminuser:admin}")
private String adminUser;
@Value("${app.mq.adminpassword:passw0rd}")
private String password;

@Bean
public MQQueueManager mqQueueManager() {
try {
Hashtable<String,Object> connectionProperties = new Hashtable<String,Object>();
connectionProperties.put(CMQC.CHANNEL_PROPERTY, adminChannel);
connectionProperties.put(CMQC.HOST_NAME_PROPERTY, host);
connectionProperties.put(CMQC.PORT_PROPERTY, port);
connectionProperties.put(CMQC.USER_ID_PROPERTY, adminUser);
connectionProperties.put(CMQC.PASSWORD_PROPERTY, password);
return new MQQueueManager(qMgrName, connectionProperties);
} catch (MQException e) {
logger.warn("MQException obtaining MQQueueManager");
logger.warn(e.getMessage());
}
return null;
}

}

获取队列的退出阈值


import com.ibm.mq.*;
import com.ibm.mq.constants.CMQC;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class Runner {
protected final Log logger = LogFactory.getLog(getClass());
@Value("${app.mq.queue:DEV.QUEUE.1}")
private String queueName = "";
private final MQQueueManager mqQueueManager;
Runner(MQQueueManager mqQueueManager) {
this.mqQueueManager = mqQueueManager;
}
@Bean
CommandLineRunner init() {
return (args) -> {
logger.info("Determining Backout threshold");
try {
int[] selectors = {
CMQC.MQIA_BACKOUT_THRESHOLD,
CMQC.MQCA_BACKOUT_REQ_Q_NAME };
int[] intAttrs = new int[1];
byte[] charAttrs = new byte[MQC.MQ_Q_NAME_LENGTH];
int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_INQUIRE | MQC.MQOO_SAVE_ALL_CONTEXT;
MQQueue myQueue = mqQueueManager.accessQueue(queueName, openOptions, null, null, null);
logger.info("Queue Obtained");
MQManagedObject moMyQueue = (MQManagedObject) myQueue;
moMyQueue.inquire(selectors, intAttrs, charAttrs);
int boThresh = intAttrs[0];
String backoutQname = new String(charAttrs);
logger.info("Backout Threshold: " + boThresh);
logger.info("Backout Queue: " + backoutQname);
} catch (MQException e) {
logger.warn("MQException Error obtaining threshold");
logger.warn(e.getMessage());
}
};
}
}

这听起来像是混合了可重试和不可重试的错误处理。如果您正在跟踪重新交付并需要发送警报,那么您可能不想设置BOTHRESH值,而是在客户端代码中管理它。

推荐的消费者错误处理模式:

  1. 如果消息无效(即错误的JSON或XML(,请立即移动到DLQ。消息的质量永远不会提高,也没有理由重复重试。

  2. 如果处理过程中的"下一步"是关闭(即数据库(,则拒绝传递,并允许重新传递延迟和回退重试。这还有一个好处,即允许队列中的其他使用者尝试处理消息,并消除了一个使用者因搁置消息而有死路的问题。

此外,请考虑使用客户端使用者代码进行监视和警报可能会有问题,因为它结合了不同的功能。如果您的目标是跟踪无效消息,那么监视DLQ通常是一种更好的设计模式,它可以从使用者代码中删除"监视"代码。

最新更新