现在我想构建一个ArrayList消息队列,它将自动填充来自MQTT代理的消息。例如,如果有三条消息是从MQTT发送的,那么在MessageQueue中,应该会自动出现三条消息。我当前的代码是
import java.util.ArrayList;
import TestMQTT.SubscribeSample;
import jade.core.Agent;
import jade.core.behaviours.TickerBehaviour;
import jade.lang.acl.ACLMessage;
public class ServerAgent extends Agent {
private static final long serialVersionUID = 1L;
protected ArrayList<ACLMessage> MessageQueue = null;
// protected ArrayList<String> myProcess = new ArrayList<>();
public void setup() {
SubscribeSample subscribeSample = new SubscribeSample();
subscribeSample.subscribe("Json");
//repeat the following actions every 5 seconds
this.addBehaviour(new TickerBehaviour(this, 5000) {
@Override
protected void onTick() {
if (subscribeSample.getMsg() != null && MessageQueue != null) {
MessageQueue.add(subscribeSample.getMsg());
System.out.println("Current MessageQueue is:" + MessageQueue);
} else {
System.out.println("No message yet");
}
System.out.println("The whole MessageQueue are:" + MessageQueue);
}
subscribesample类定义如下:
public class SubscribeSample {
public static String arrivedMessage;
public static ArrayList<Object> ReceivedRequests = new ArrayList<>();
public static ACLMessage msg = null;
public static void subscribe(String TOPIC) {
String broker = "tcp://192.168.137.100:1883";
int qos = 1;
String clientid = "mqtt-explorer-3260c410";
MaintestOutput m = new MaintestOutput();
try {
MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setConnectionTimeout(10);
options.setKeepAliveInterval(20);
client.setCallback(new MqttCallback() {
public void connectionLost(Throwable cause) {
System.out.println("connectionLost");
}
public void messageArrived(String TOPIC, MqttMessage message)
throws ClassNotFoundException, IOException {
System.out.println("======get message from [" + TOPIC + "]======");
System.out.println("message content:" + new String(message.getPayload()));
String Json = new String(message.getPayload());
Gson gson = new Gson();
msg = gson.fromJson(Json, ACLMessage.class);
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
}
);
// estblished connection
System.out.println("conneted to the broker: " + broker);
client.connect(options);
System.out.println("connected successfully");
client.subscribe(TOPIC, qos);
System.out.println("start listening" + TOPIC);
} catch (
Exception e) {
e.printStackTrace();
}
}
在我的ServerAgent中,System.out.println("当前消息队列为:"+消息队列(将只显示当前接收到的消息,而不是以前的消息,并且System.out.print的输出("整个消息队列为"+消息排队"(始终为零。
我想实现ServerAgent接收到的每一条消息,都会自动发送到MessageQueue,输出System.out.println("整个MessageQueue都是:"+MessageQueue(就是的所有消息
列表MessageQueue
始终为null
;你没有给它分配新的列表。
将声明更改为:
protected ArrayList<ACLMessage> MessageQueue = new ArrayList<>();
此外,最好根据Java命名惯例将其命名为messageQueue
,该惯例建议字段/变量/参数使用camelBase。
此外,总是最好使用抽象类型,所以List<ACLMessage>
而不是ArrayList<ACLMessage>
。
推荐形式为:
protected List<ACLMessage> messageQueue = new ArrayList<>();