与Quarkus中的消息队列连接时出错



我正试图在Quarkus中消费/写入消息队列,但一直无法做到这一点。我有一个示例代码,可以用来连接到队列,但它是用基于javax.jms、的com.ibm.mq.allclient库创建的

<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>com.ibm.mq.allclient</artifactId>
<version>9.0.4.0</version>
</dependency>

要连接,请使用以下参数:主机名、端口、名称、通道、队列名称。

示例代码使用com.ibm.mq.allclient库创建用于消费和写入的连接,如下所示:

import com.ibm.mq.jms.MQQueue;
import com.ibm.mq.jms.MQQueueConnection;
import com.ibm.mq.jms.MQQueueConnectionFactory;
import com.ibm.mq.jms.MQQueueSession;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyClass {
private static Logger logger = LoggerFactory.getLogger(MyClass.class);
private MQQueueConnection connectionRq;
private MQQueueConnection connectionRs;
public void initConnection(){

try {
MQQueueConnectionFactory connectionFactoryRq = new MQQueueConnectionFactory();
connectionFactoryRq.setHostName("localhost");
connectionFactoryRq.setPort(5672);
connectionFactoryRq.setTransportType(1);
connectionFactoryRq.setQueueManager("QM_NAME");
connectionFactoryRq.setChannel("CHANNEL");
MQQueueConnectionFactory connectionFactoryRs = new MQQueueConnectionFactory();
connectionFactoryRs.setHostName("localhost");
connectionFactoryRs.setPort(5672);
connectionFactoryRs.setTransportType(1);
connectionFactoryRs.setQueueManager("QM_NAME");
connectionFactoryRs.setChannel("CHANNEL");
connectionRq = (MQQueueConnection) connectionFactoryRq.createQueueConnection();
connectionRs = (MQQueueConnection) connectionFactoryRs.createQueueConnection();

} catch (Exception e) {
logger.info(e.getMessage());
}
}
public String sendMessage(String msg, String correlativeId){

String corId = null;
MQQueueSession sessionRq = null;
MQQueueSession sessionRs = null;
MessageProducer producer = null;
try {
sessionRq = (MQQueueSession) connectionRq.createQueueSession(false, 1);
MQQueue queueRq = (MQQueue) sessionRq.createQueue("queue:///QUEUENAME.RQ");
queueRq.setMessageBodyStyle(1);
queueRq.setTargetClient(1);
sessionRs = (MQQueueSession) connectionRs.createQueueSession(false, 1);
MQQueue queueRs = (MQQueue) sessionRs.createQueue("queue:///QUEUENAME.RS");
producer = sessionRq.createProducer(queueRq);
Message messageRq = sessionRq.createTextMessage(msg);
messageRq.setJMSReplyTo(queueRs);
messageRq.setIntProperty("JMS_IBM_Character_Set", 819);
messageRq.setIntProperty("JMS_IBM_Encoding", 273);
messageRq.setIntProperty("JMS_IBM_MsgType", 8);
messageRq.setJMSMessageID(correlativeId);
messageRq.setJMSCorrelationIDAsBytes(correlativeId.getBytes());
messageRq.setJMSPriority(1);
messageRq.setJMSType("Datagram");
producer.send(messageRq);
corId = messageRq.getJMSCorrelationID();
} catch (Exception e) {
logger.info(e.getMessage());
}  finally {
try {
sessionRq.close();
} catch (Exception e) {
logger.info(e.getMessage());
}
try {
sessionRs.close();
} catch (Exception e) {
logger.info(e.getMessage());
}
try {
producer.close();
} catch (Exception e) {
logger.info(e.getMessage());
}
}
return corId;
}
public String consumerMessage(String correlativeId){
String msg = null;
MQQueueSession sessionRs = null;
MessageConsumer consumer = null;
try {
sessionRs = (MQQueueSession) connectionRs.createQueueSession(false, 1);
MQQueue queueRs = (MQQueue) sessionRs.createQueue("queue:///QUEUENAME.RS");
consumer = sessionRs.createConsumer(queueRs, "JMSCorrelationID='" + correlativeId + "'");
connectionRs.start();
Message messageRs = consumer.receive(10000L);
msg = ((TextMessage) messageRs).getText();
} catch (Exception e) {
logger.info(e.getMessage());
}  finally {
try {
sessionRs.close();
} catch (Exception e) {
logger.info(e.getMessage());
}
try {
consumer.close();
} catch (Exception e) {
logger.info(e.getMessage());
}
}
return msg;
}
public static void main(String[] args) {
MyClass myClass = new MyClass();
myClass.initConnection();
String corId = myClass.sendMessage("Test Message Send", "UNIQUE");
String message = myClass.consumerMessage(corId);
logger.info("The message: " + message);
}
}

上面的代码运行良好,问题是库与本地Quarkus编译器不兼容。

对于与Quarkus中MQ的连接,我使用库:

<dependency>
<groupId>org.amqphub.quarkus</groupId>
<artifactId>quarkus-qpid-jms</artifactId>
</dependency>

在我分配的application.properties中:

quarkus.qpid-jms.url=amqp://localhost:5672

当尝试在Quarkus Qpid JMS Quickstart中运行示例项目时,会出现以下错误:

__  ____  __  _____   ___  __ ____  ______ 
--/ __ / / / / _ | / _ / //_/ / / / __/ 
-/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /    
--________/_/ |_/_/|_/_/|_|____/___/   
2022-03-17 12:02:31,489 INFO  [org.acm.jms.PriceConsumer] (pool-11-thread-1) Writing MQ Client...
2022-03-17 12:02:31,486 INFO  [org.acm.jms.PriceConsumer] (pool-10-thread-1) Reading MQ Client...
2022-03-17 12:02:31,757 INFO  [io.quarkus] (Quarkus Main Thread) jms-quickstart 1.0.0-SNAPSHOT on JVM (powered by Quarkus 2.7.2.Final) started in 4.626s. Listening on: http://localhost:8080
2022-03-17 12:02:31,758 INFO  [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2022-03-17 12:02:31,759 INFO  [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, qpid-jms, resteasy, smallrye-context-propagation, vertx]
2022-03-17 12:02:34,068 ERROR [org.apa.qpi.jms.JmsConnection] (pool-11-thread-1) Failed to connect to remote at: amqp://localhost:5672
2022-03-17 12:02:34,068 ERROR [org.apa.qpi.jms.JmsConnection] (pool-10-thread-1) Failed to connect to remote at: amqp://localhost:5672

我知道这很可能是一个配置错误,我错过了一些东西,但我是Quarkus的新手,我已经阅读并尝试了很多东西,我已经崩溃了。

我很感激任何形式的帮助,因为这将是伟大的,或者至少文件或指导我的东西也很受欢迎。

查阅的文件:

  • 使用Quarkus和GraalVM开发JMS应用程序
  • 使用JMS

当您尝试连接到localhost:5672时,连接失败。这意味着您在该TCP/IP端口上没有任何侦听。您引用的指令有一个模板MQSC脚本,该脚本设置队列管理器上所需的资源,以允许AMQP连接。这个脚本可以在这里找到。

这个脚本中与您报告的特定问题最相关的命令可能是以下命令(尽管我建议您运行整个脚本(:

START SERVICE(SYSTEM.AMQP.SERVICE)
START CHANNEL(SYSTEM.DEF.AMQP)

您的队列管理器使用哪种版本的IBM MQ?

因为在IBM MQ v9.2.1之前,它只通过AMQP支持Pub/Sub。如果您想使用点对点拓扑(获取/放入队列(,那么您需要您的队列管理器至少为MQ v9.2.1。请参见此处。

第一段具有蓝色标签";v9.2.1";。这意味着当该功能被引入IBMMQ.时

现在,您可以伪造它,通过设置一个指向队列的管理主题对象,然后使用AMQP Pub/Sub访问队列,但将队列管理器升级到MQ v9.2.1或更高版本会简单得多。当前的IBM MQ CD版本是v9.2.5。

注意:IBM MQ v9.2的LTS版本还没有此功能。它将包含在MQ的下一个主要版本中(即v9.3或其他名称(。

最新更新