ActiveMQ设置,服务器,属性都在jndi.properties文件中。
例:
java.naming.provider.url=failover:(tcp://localhost:61616?keepAlive=true)
java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
queue.MyQueue = testUpdate
虽然我的程序看起来像这样:
public class MQReader{
public final String JNDI_FACTORY = "ConnectionFactory";
public final String QUEUE = "MyQueue";
private QueueConnectionFactory queueConnectionFactory;
private QueueConnection queueConnection;
private QueueSession queueSession;
private QueueReceiver queueReceiver;
private Queue queue;
public static void main(String[] args) throws Exception {
// create a new intial context, which loads from jndi.properties file
javax.naming.Context ctx = new javax.naming.InitialContext();
MQReader reader = new MQReader();
reader.init(ctx);
try {
reader.wait();
} catch (InterruptedException ie) {
ie.printStackTrace();
}
reader.close();
}
public void init(Context context) throws NamingException, JMSException {
queueConnectionFactory = (QueueConnectionFactory) context.lookup(JNDI_FACTORY);
queueConnection = queueConnectionFactory.createQueueConnection();
queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
queue = (Queue) context.lookup(QUEUE);
queueReceiver = queueSession.createReceiver(queue);
queueReceiver.setMessageListener(
message ->{
try {
if(message != null) {
//do stuff like print message for testing.
}
} catch (JMSException jmse) {
System.err.println("An exception occurred: " + jmse.getMessage());
}
}
);
queueConnection.start();
}
public void close() throws JMSException {
queueReceiver.close();
queueSession.close();
queueConnection.close();
}
}
我认为 jndi 中的故障转移项目应该负责我的重新连接,但它没有。我运行了代理并运行了程序,它运行良好,但是一旦我停止代理,我的消费者程序就会以退出代码 1 退出。"进程已完成,退出代码为 1"
我不确定我在这里做错了什么。我在很多地方添加了打印语句,发现它在 reader.wait() 处退出而没有触发任何异常。
我想通了。由于我的主线程在我启动侦听器线程后退出并且侦听器线程完美运行(非守护程序线程),因此它使 JVM 保持运行。一旦侦听器失去连接,程序就会退出,因为没有非守护进程线程在运行。故障转移协议代码作为守护程序线程运行,因此 JVM 退出程序而不让故障转移协议代码重新连接。
所以我所做的是添加这段代码,不是最好的方法,但它适用于我正在尝试做的事情。
Scanner in = new Scanner(System.in);
while(true){
System.out.println("Please enter "stop" to stop the program.");
String command = in.nextLine();
if("stop".equalsIgnoreCase(command)){
reader.close();
System.exit(0);
}
}
而不是
try {
reader.wait();
} catch (InterruptedException ie) {
ie.printStackTrace();
}
reader.close();
等待方法崩溃,并且没有使主线程保持活动状态。这也是一种保持程序运行的方法,而无需向队列发送消息以停止它。