activemq UDP jms connection



我在activemq java和我使用ssl或udp连接的一个项目上工作。接收消息,我使用qeuereceiver和MessageListener类。问题是ssl连接完美,但当我选择一个udp连接,我发送7个消息,我把监听器工作,我收到5个消息后,我只收到2个消息。我不知道到底是什么问题,有人可以帮助,请有一个代码

代码{一块Switch (protocol) {例"UDP":connectionFactoryUDP = new ActiveMQConnectionFactory(brokerUrlUDP);connection = connectionFactoryUDP。createQueueConnection(用户名、密码);

break;
case "TLS":
connectionFactoryTLS = new ActiveMQSslConnectionFactory();
// create connection
connectionFactoryTLS.setBrokerURL(brokerUrlTLS);
connectionFactoryTLS.setTrustStore(trustStore);
connectionFactoryTLS.setTrustStorePassword(trustStorePassword);
connectionFactoryTLS.setKeyStore(keyStore);
connectionFactoryTLS.setKeyStorePassword(keyStorePassword);
connection = connectionFactoryTLS.createQueueConnection(username,password);

break;}

//ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.username, this.password, "tcp://localhost:61616");
// create connection

// start

connection.start();
// create session

// create queue (it will create if queue doesn't exist)
// create listener
System.out.println("____________________________________");
MessageListener messageListener = new MessageListener() {
@Override
public void onMessage(Message message) {
System.out.println("____________________________________");
// only text type message
if (message instanceof TextMessage) {
TextMessage txt = (TextMessage) message;
try {
System.out.println("Message received =---_________--" + txt.getText());
} catch (JMSException e) {
System.out.println("error retrieving message");
System.exit(1);
}
}
}
};
session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
QueueReceiver consumer = session.createReceiver(queue);
consumer.setMessageListener(messageListener);
System.in.read();
consumer.close();
session.close();
connection.close();
System.out.println("-----------closed----------");

日志控制台2015-10-16 18:30:42 DEBUG UdpTransport:382 -绑定地址:0.0.0.0/0.0.0.0:02015-10-16 18:30:42 DEBUG Connection:1:55 - SENDING: ConnectionInfo {commandId = 1, responseRequired = true, connectionId = ID:aymenmhidhi- pc -57832-1445013042216-1:1, clientId = ID:aymenmhidhi- pc -57832-1445013042216-0:1, clientIp = null, userName = aymenmhidhi, password = *****, brokerPath = null, brokerMasterConnector = false, manageable = true, clientMaster = true, faulttolant = false, failoverReconnect = false}2015-10-16 18:30:42 DEBUG UdpTransport:124 -发送单向从:udp://localhost:61625?trace=true@49257 to target: localhost/127.0.0.1:61625 command: ConnectionInfo {commandId = 1, responseRequired =true, connectionId = ID:aymenmhidhi- pc -57832-1445013042216-1:1, clientId = ID:aymenmhidhi- pc -57832-1445013042216-0:1, clientIp = null, userName = aymenmhidhi, password = *****, brokerPath = null, brokerMasterConnector = false, manageable =true, clientMaster =true, faulttolant = false, failoverReconnect = false}2015-10-16 18:30:42 DEBUG DefaultReplayBuffer:47 -添加命令ID: 1到重放缓冲区:org.apache.activemq.transport.reliable。DefaultReplayBuffer@7e3e2cff对象:java.nio。DirectByteBuffer[pos=129 lim=4096 cap=4096]2015-10-16 18:30:42 DEBUG CommandDatagramChannel:242 - Channel: udp://localhost:61625?trace=true@49257发送数据报:1到:localhost/127.0.0.1:616252015-10-16 18:30:42 DEBUG CommandDatagramChannel:110 - Channel: udp://localhost:61625?trace=true@49257 received from: Endpoint[name:/127.0.0.1:49258] about to process: Response {commandId = 1, responserequied = false, correlationId = 1}2015-10-16 18:30:42 DEBUG Connection:1:60 - RECEIVED: Response {commandId = 1, responserrequied = false, correlationId = 1}2015-10-16 18:30:42 DEBUG CommandDatagramChannel:110 - Channel: udp://localhost:61625?trace=true@49257 received from: Endpoint[name:/127.0.0.1:49258] about to process: ConnectionControl {commandId = 2, responseRequired = false, suspend = false, resume = false, close = false, exit = false, faultttolerance = false, connectedBrokers =, reconnectTo =, token = null, rebalanceConnection = false}2015-10-16 18:30:42 DEBUG Connection:1:60 - RECEIVED: ConnectionControl {commandId = 2, responserequied = false, suspend = false, resume = false, close = false, exit = false, faultttolerance = false, connectedBrokers =, reconnectTo =, token = null, rebalanceConnection = false}2015-10-16 18:30:42 DEBUG CommandDatagramChannel:110 - Channel: udp://localhost:61625?跟踪= true@49257收到:端点(名称:/127.0.0.1:49258]关于处理:BrokerInfo {commandId = 3, responseRequired = false, brokerId = ID: aymenmhidhi - pc - 57133 - 1445011516433 - 0:1, brokerURL = tcp://aymenmhidhi-PC: 61616年,slaveBroker = false, masterBroker = false, faultTolerantConfiguration = false, networkConnection = false, duplexConnection = false, peerBrokerInfos = null, brokerName = localhost, connectionId = 0, brokerUploadUrl = null, networkProperties =零}2015-10-16 18:30:42 DEBUG Connection:1:60 - RECEIVED: BrokerInfo {commandId = 3, responseRequired = false, brokerId = ID: aymenmhidi - pc -57133-1445011516433-0:1, brokerURL = tcp://aymenmhidi - pc:61616, slaveBroker = false, masterBroker = false, faultTolerantConfiguration = false, networkConnection = false, duplexConnection = false, peerbrokerinfo = null, brokerName = localhost, connectionId = 0, brokerUploadUrl = null, networkProperties = null}2015-10-16 18:30:42 DEBUG Connection:1:55 - SENDING: ConsumerInfo {commandId = 2, responseRequired = true, consumerId = ID: aymenmhidi - pc -57832-1445013042216-1:1:-1:1, destination = ActiveMQ.Advisory. tempqueue,ActiveMQ.Advisory。TempTopic, prefetchSize = 1000, maximumPendingMessageLimit = 0, browser = false, dispatchAsync = true, selector = null, clientId = null, subscriptionName = null, nollocal = true, exclusive = false, retroactive = false, priority = 0, brokerPath = null, optimizedAcknowledge = false, noRangeAcks = false, additionalPredicate = null}2015-10-16 18:30:42 DEBUG UdpTransport:124 -发送单向从:udp://localhost:61625?trace=true@49257 to target:/127.0.0.1:49258 command: ConsumerInfo {commandId = 2, responseRequired =true, consumerId = ID:aymenmhidhi-PC-57832-1445013042216-1:1:-1:1, destination = ActiveMQ.Advisory. tempqueue,ActiveMQ.Advisory。TempTopic, prefetchSize = 1000, maximumPendingMessageLimit = 0, browser = false, dispatchAsync = true, selector = null, clientId = null, subscriptionName = null, nollocal = true, exclusive = false, retroactive = false, priority = 0, brokerPath = null, optimizedAcknowledge = false, noRangeAcks = false, additionalPredicate = null}2015-10-16 18:30:42 DEBUG DefaultReplayBuffer:47 -添加命令ID: 2到重放缓冲区:org.apache.activemq.transport.reliable。DefaultReplayBuffer@7e3e2cff对象:java.nio。DirectByteBuffer[pos=155 lim=4096 cap=4096]2015-10-16 18:30:42 DEBUG CommandDatagramChannel:242 - Channel: udp://localhost:61625?trace=true@49257发送datagram: 2到:/127.0.0.1:492582015-10-16 18:30:42 DEBUG CommandDatagramChannel:110 - Channel: udp://localhost:61625?trace=true@49257 received from: Endpoint[name:/127.0.0.1:49258] about to process: Response {commandId = 4, responserequied = false, correlationId = 2}2015-10-16 18:30:42 DEBUG Connection:1:60 - RECEIVED: Response {commandId = 4, responserrequied = false, correlationId = 2}

brokerurlbrokerUrlUDP = udp://localhost: 61625 ?跟踪= true

谢谢。

package eu.dedalus.x1v1.arr.arrReceiver;
import java.util.concurrent.ThreadPoolExecutor;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSslConnectionFactory;
import org.apache.activemq.broker.TransportConnector;
import org.apache.log4j.Logger;
import org.springframework.scheduling.annotation.Scheduled;
public class ActiveMQQueuListener {
private static final Logger log = Logger
.getLogger(ActiveMQQueuListener.class.getName());
private String brokerUrlTLS;
private String brokerUrlUDP;
private String username;
private String password;
private ArrListener listener;
private String queueName;
private String protocol;
private String keyStore;
private String trustStore;
private String keyStorePassword;
private String trustStorePassword;
private boolean exit = false;
private QueueConnection connection = null;
private QueueSession session = null;
private ActiveMQConnectionFactory connectionFactoryUDP;
private ActiveMQSslConnectionFactory connectionFactoryTLS;
private MessageConsumer consumer;
private ThreadPoolExecutor executor;
private TransportConnector connector;
public void startReceiving() throws Exception {
System.out.println("------Start receiving---------");
try {
switch (protocol) {
case "UDP":
connectionFactoryUDP = new ActiveMQConnectionFactory(
brokerUrlUDP);
connection = connectionFactoryUDP.createQueueConnection(
username, password);
break;
case "TLS":
connectionFactoryTLS = new ActiveMQSslConnectionFactory();
// create connection
connectionFactoryTLS.setBrokerURL(brokerUrlTLS);
connectionFactoryTLS.setTrustStore(trustStore);
connectionFactoryTLS.setTrustStorePassword(trustStorePassword);
connectionFactoryTLS.setKeyStore(keyStore);
connectionFactoryTLS.setKeyStorePassword(keyStorePassword);
connection = connectionFactoryTLS.createQueueConnection(
username, password);
break;
}
// ConnectionFactory connectionFactory = new
// ActiveMQConnectionFactory(this.username, this.password,
// "tcp://localhost:61616");
// create connection
// start
connection.start();
// create session
// create queue (it will create if queue doesn't exist)
// create listener
MessageListener messageListener = new MessageListener() {
@Override
public void onMessage(Message message) {
// only text type message
if (message instanceof TextMessage) {
TextMessage txt = (TextMessage) message;
try {
System.out
.println("Message received =---_________--"
+ txt.getText());
} catch (JMSException e) {
System.out.println("error retrieving message");
System.exit(1);
}
}
}
};
session = connection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
QueueReceiver consumer = session.createReceiver(queue);
consumer.setMessageListener(messageListener);
System.in.read();
consumer.close();
session.close();
connection.close();
System.out.println("-----------closed----------");
} catch (Exception e) {
System.out.println("Exception while sending message to the queue"
+ e);
throw e;
}
}
public void setListener(ArrListener listener) {
this.listener = listener;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public ArrListener getListener() {
return listener;
}
public static Logger getLog() {
return log;
}
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public String getProtocol() {
return protocol;
}
public void setProtocol(String protocol) {
this.protocol = protocol;
}
public String getKeyStore() {
return keyStore;
}
public void setKeyStore(String keyStore) {
this.keyStore = keyStore;
}
public String getTrustStore() {
return trustStore;
}
public void setTrustStore(String trustStore) {
this.trustStore = trustStore;
}
public String getKeyStorePassword() {
return keyStorePassword;
}
public void setKeyStorePassword(String keyStorePassword) {
this.keyStorePassword = keyStorePassword;
}
public String getTrustStorePassword() {
return trustStorePassword;
}
public void setTrustStorePassword(String trustStorePassword) {
this.trustStorePassword = trustStorePassword;
}
public String getBrokerUrlTLS() {
return brokerUrlTLS;
}
public void setBrokerUrlTLS(String brokerUrlTLS) {
this.brokerUrlTLS = brokerUrlTLS;
}
public String getBrokerUrlUDP() {
return brokerUrlUDP;
}
public void setBrokerUrlUDP(String brokerUrlUDP) {
this.brokerUrlUDP = brokerUrlUDP;
}
public void exitMethod() {
System.exit(1);
try {
consumer.close();
session.close();
connection.close();
System.out.println("-----------closed----------");
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public boolean isExit() {
return exit;
}
public void setExit(boolean exit) {
this.exit = exit;
}
public Connection getConnection() {
return connection;
}
public ActiveMQConnectionFactory getConnectionFactoryUDP() {
return connectionFactoryUDP;
}
public void setConnectionFactoryUDP(
ActiveMQConnectionFactory connectionFactoryUDP) {
this.connectionFactoryUDP = connectionFactoryUDP;
}
public ActiveMQSslConnectionFactory getConnectionFactoryTLS() {
return connectionFactoryTLS;
}
public void setConnectionFactoryTLS(
ActiveMQSslConnectionFactory connectionFactoryTLS) {
this.connectionFactoryTLS = connectionFactoryTLS;
}
}

最新更新