我是MQTT的新手,我有一些问题希望你们能帮助我。我正在进行一个项目,该项目要求我使用MQTT协议,该程序需要用java编写(只是一些背景信息(
MQTT客户端可以订阅特定的时间间隔吗?我需要使用eclipsepaho客户端mqttv3读取mqtt消息,并订阅特定主题一段时间(例如15分钟(,然后读取这些mqtt消息。请在下面找到我尝试过的代码。
private void initializeConnectionOptions() {
try {
mqttConnectOptions.setCleanSession(false);
mqttConnectOptions.setAutomaticReconnect(false);
mqttConnectOptions.setSocketFactory(SslUtil.getSocketFactory(this.caCrt, this.clientCrt, this.clientKey));
mqttConnectOptions.setKeepAliveInterval(300);
mqttConnectOptions.setConnectionTimeout(300);
mqttClient = new MqttClient("ssl://IP:port", "clientID", memoryPersistence);
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String attribute = "Attribute";
JSONObject json = new JSONObject(message.toString());
LOGGER.info("json value is "+ json.toString());
if (json.toString().contains(attribute)) {
int value = json.getInt(attribute);
Long sourceTimestamp = json.getLong("sourceTimestamp");
String deviceName = json.getString("deviceName");
String deviceType = json.getString("deviceType");
if (!nodeValueWithDevice.containsKey(deviceName)) {
List<Integer> attributeValue = new ArrayList<Integer>();
if (!attributeValue.contains(value)) {
attributeValue.add(value);
}
nodeValueWithDevice.put(deviceName, attributeValue);
} else {
List<Integer> temList = nodeValueWithDevice.get(deviceName);
if (!temList.contains(value)) {
temList.add(value);
}
nodeValueWithDevice.put(deviceName, temList);
}
if (!sourceTimestampWithDevice.containsKey(deviceName)) {
List<Long> Time = new ArrayList<Long>();
if (!Time.contains(sourceTimestamp)) {
Time.add(sourceTimestamp);
}
sourceTimestampWithDevice.put(deviceName, Time);
} else {
List<Long> tempList2 = sourceTimestampWithDevice.get(deviceName);
if (!tempList2.contains(sourceTimestamp)) {
tempList2.add(sourceTimestamp);
}
sourceTimestampWithDevice.put(deviceName, tempList2);
}
LOGGER.info(" map of source time stamp is :::" + sourceTimestampWithDevice);
LOGGER.info(" map of value is :::" + nodeValueWithDevice);
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
} catch (MqttException | NoSuchAlgorithmException me) {
LOGGER.error("Error while connecting to Mqtt broker. Error message {} Error code {}", me.getMessage());
}
}
public void subscription(String inputTopic) {
try {
connectToBroker();
mqttClient.subscribe(getOutputTopic(inputTopic), 1);
LOGGER.info("subscription is done::::");
} catch (Exception e) {
LOGGER.error("Error while subscribing message to broker", e.getMessage());
e.printStackTrace();
}
}
否,所有客户端都设计为在客户端连接的生存期内接收所有消息。
如果你只想在给定的持续时间内订阅,那么你就要想办法在该时间过去时收到通知,并明确断开客户端的连接。
根据v5.0和v3.1.1的MQTT规范,没有指定的方法只能订阅固定间隔的主题。但是,这可以通过您的应用程序逻辑来完成。
在您的情况下,假设您完全控制了客户端,您可以订阅某个主题,跟踪连接的时间,然后在15分钟后(或您指定的任何间隔(发送该主题的UNSUBSCRIBE数据包。