具有qos 2的发布者发布会从broker或subscriber获得确认



我对qos有点困惑,我读到关于qos的文章如果qos设置为2,那么broker/client将使用四步握手只传递一次消息。

因此qos 2确认消息是在broker上发布的,而不是由订阅者(客户端)接收的。或消息由订阅者接收或

对于确认,我们应该建立一个类似于发布者的应用程序,发布者将发布主题为"DATA"的消息,并订阅主题为"ACK"的消息。订阅者需要发布主题"ACK"上的确认,即在主题"DATA"上收到消息

我创建了一个用于发布数据的java类和另一个用于订阅的类出版商

在下面的代码中,我试图以qos 2发布,在deliveryComplete函数中,当我尝试使用qos 0时,我在尝试getMessage()时遇到异常。

public class PublishMe implements MqttCallback{
MqttClient myClient;
MqttClient myClientPublish;
MqttConnectOptions connOpt;
MqttConnectOptions connOptPublish;
static final String BROKER_URL = "tcp://Ehydromet-PC:1883";
static Boolean msgACK=false;    
public static void main(String[] args) {
PublishMe smc = new PublishMe();
smc.runClient();
}
@Override
public void connectionLost(Throwable t) {
System.out.println("Connection lost!");
}
@Override
public void messageArrived(String string, MqttMessage message) throws Exception {
System.out.println("-------------------------------------------------");
System.out.println("| Topic:" + string);
System.out.println("| Message: " + new String(message.getPayload()));
System.out.println("-------------------------------------------------");
}
/**
* 
* deliveryComplete
* This callback is invoked when a message published by this client
* is successfully received by the broker.
* 
* @param token
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
try{
System.out.println("Message delivered successfully to topic : "" + token.getMessage().toString() + "".");
}catch(Exception ex){
System.out.println(ex.getCause()+" -- "+ex.getLocalizedMessage()+" -- "+ex.getMessage()+" -- " );      
}
}
public void runClient() {
connOpt = new MqttConnectOptions();
connOpt.setCleanSession(false);
connOpt.setKeepAliveInterval(0);
connOptPublish= new MqttConnectOptions();
connOptPublish.setCleanSession(false);
connOptPublish.setKeepAliveInterval(0);
// Connect to Broker
try {
myClient = new MqttClient(BROKER_URL, "pahomqttpublish11");
myClient.setCallback(this);
myClient.connect(connOpt);
myClientPublish= new MqttClient(BROKER_URL, "pahomqttpublish42");
myClientPublish.setCallback(this);
myClientPublish.connect(connOptPublish);
} catch (MqttException e) {
e.printStackTrace();
System.exit(-1);
}
System.out.println("Connected to " + BROKER_URL);
String myTopic = "sample";
//                String myTopic = "receiveDATA2";
MqttTopic topic = myClientPublish.getTopic(myTopic);
// publish messages if publisher
if (publisher) {
int i=1;
while(true){
String pubMsg = "sample msg "+i;
MqttMessage message = new MqttMessage(pubMsg.getBytes());
System.out.println(message);
message.setQos(2);
message.setRetained(false);
// Publish the message
MqttDeliveryToken token = null;
try {
// publish message to broker
token = topic.publish(message);
// Wait until the message has been delivered to the broker
token.waitForCompletion();
msgACK=false;
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
}           
}
}

}

下面是用户

public class Mqttsample implements MqttCallback{
MqttClient myClient;
MqttClient myClientPublish;
MqttConnectOptions connOpt;
MqttConnectOptions connOptPublish;
static final String BROKER_URL = "tcp://Ehydromet-PC:1883";
// the following two flags control whether this example is a publisher, a subscriber or both
static final Boolean subscriber = true;
static final Boolean publisher = true;
public static void main(String[] args) {

Mqttsample smc = new Mqttsample();
smc.runClient();
}
@Override
public void connectionLost(Throwable t) {
System.out.println("Connection lost!");
// code to reconnect to the broker would go here if desired
}
@Override
public void messageArrived(String string, MqttMessage message) throws Exception {
//throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
System.out.println("| Topic:" + string+"| Message: " + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
try{
System.out.println("Pub complete" + new String(token.getMessage().getPayload()));
}
catch(Exception ex ){
System.out.println("delivery Error "+ex.getMessage());
}
}

public void runClient() {
connOpt = new MqttConnectOptions();
connOpt.setCleanSession(false);
connOpt.setKeepAliveInterval(0);
connOptPublish= new MqttConnectOptions();
connOptPublish.setCleanSession(false);
connOptPublish.setKeepAliveInterval(0);
// Connect to Broker
try {
myClient = new MqttClient(BROKER_URL, "pahomqttpublish");
myClient.setCallback(this);
myClient.connect(connOpt);
myClientPublish= new MqttClient(BROKER_URL, "pahomqttsubscribe");
myClientPublish.setCallback(this);
myClientPublish.connect(connOptPublish);
} catch (MqttException e) {
e.printStackTrace();
System.exit(-1);
}
System.out.println("Connected to " + BROKER_URL);

// subscribe to topic if subscriber
if (subscriber) {
try {
//String myTopicACK = M2MIO_DOMAIN + "/" + "ACK" + "/" + M2MIO_THING;
String myTopicACK = "sample";
// MqttTopic topicACK = myClient.getTopic(myTopicACK);
int subQoS = 2;
myClient.subscribe(myTopicACK, subQoS);
} catch (Exception e) {
e.printStackTrace();
}
}
//                 

}

}

我如何确保订阅者已经收到消息,我需要在发布者代码中实现什么。

http://www.eclipse.org/paho/files/mqttdoc/Cclient/qos.html从上面的链接

QoS2,精确一次:消息总是精确地传递一次。消息必须存储在发送方本地,直到发送方收到接收方已发布消息的确认为止。将存储该消息,以防必须再次发送该消息。QoS2是最安全但最慢的传输模式。

正如您所确定的,较高的QOS级别仅描述客户端(发布者或订阅者)和代理之间的消息传递,而不是端到端的发布者到订阅者。

这是非常深思熟虑的,因为作为发布/子协议,无法知道一个主题可能有多少订阅者。可以有0到n之间的任何数字。此外,发布者和订阅者可以在不同的QOS级别与主题交互(发布者可以在QOS 2发布,订阅者可以在QOS 0订阅)。消息也可以作为保留消息发布,这样最后一条保留消息将始终传递给新订阅的客户端。

客户端上满足QOS契约的所有存储都应该由您正在使用的MQTT库(在本例中为Paho)处理

deliveryComplete回调仅表示发布者已完成向代理发送消息。此外,文档说,如果消息已经传递,token.getMessage()将返回null,这将解释您提到的异常(我不得不在这里猜测,因为您没有包括异常)。

如果您的应用程序体系结构确实需要对消息进行端到端确认,那么您将需要实现与您所描述的类似的东西。但是,为了确保它正常工作,你应该在消息的有效负载中包括一个消息id,确认消息应该包括这一点,并可能以某种方式识别哪个订户正在回复,以确保你知道是谁收到了消息。我使用这样的东西的唯一原因是,如果确认消息需要时间。如果时间与此无关,则考虑使用持久会话来确保消息在重新连接时传递到订阅客户端(如果在发布时已断开连接)。