如何在并发环境中尝试重新启动()JMS连接



我有一个JMS应用程序,它试图从JBosss队列中读取。我在班上实现了MessageListener,并使用onMessage()接收消息

public class JBossConnector implements MessageListener, AutoCloseable {}

这是我的方法:

/**
* The listener method of JMS. It listens to messages from queue: 'jbossToAppia'
* If the message is of type MessageObject, then transfer that to Appia
*
* @param message JMS Message
*/
@Override
public void onMessage(Message message) {
// receive the message from jboss queue: 'jbossToAppia'
// then post it to appia
if (message instanceof ObjectMessage) {
try {
MessageObject messageObject = (MessageObject) ((ObjectMessage) message).getObject();
System.out.printf("JbossConnector: MessageObject received from JBOSS, %sn", messageObject.getMessageType());
component.onMessageFromJboss(properties.getProperty("target.sessionID"), messageObject);
} catch (MessageFormatException exception) {
logger.error(ExceptionHandler.getFormattedException(exception));
ExceptionHandler.printException(exception);
} catch (JMSException exception) {
ExceptionHandler.printException(exception);
restart();
}
} else {
System.out.printf("%s: MessageFormatException(Message is not of the format MessageObject)n", this.getClass().getSimpleName());
}
}

每当我找到JMSException时,我都会尝试重新启动JBoss连接(Context、connection、Session、Receiver、Sender(。我怀疑的是,我读过onMessage()使用多个线程从队列接收消息(如果我错了,请纠正我(。

当JBoss队列连接断开时,至少会有一些队列抛出此异常。这意味着他们都会尝试restart()连接,这是浪费时间(restart()首先关闭所有连接,将变量设置为null,然后尝试启动连接(。

现在我可以做一些类似的事情

synchronized (this){
restart();
}

或者使用CCD_ 7变量。但这并不能保证当当前线程完成restart()操作时,其他线程不会尝试restart()(如果我错了,请再次纠正我(。

有什么解决方案可以让它发挥作用吗?

MessageListeneronMessage()实际上是从它自己的线程运行的,因此您需要适当的并发控制。我认为最简单的解决方案就是使用java.util.concurrent.atomic.AtomicBoolean。例如,在restart()方法中,您可以执行以下操作:

private void restart() {
AtomicBoolean restarting = new AtomicBoolean(false);
if (!restarting.getAndSet(true)) {
// restart connection, session, etc.
}
}

这将使restart()方法具有有效的幂等性。多个线程将能够调用restart(),但只有调用它的第一个线程才会真正导致资源被重新创建。所有其他电话将立即返回。

相关内容

  • 没有找到相关文章

最新更新