MQTT 回调客户端重连逻辑



我找不到重新连接 mqtt 回调客户端的逻辑。有方法onDisconnected(),但我无法在互联网上找到文档或任何示例示例。

我的听众

公共类 MyListener 实现侦听器{

    public MyListener()
    {
    }
    @Override
    public void onConnected()
    {
        System.out.println("Connected ....");
    }
    @Override
    public void onDisconnected()
    {
        System.out.println("Disconnected");
    }
    @Override
    public void onPublish(UTF8Buffer topic, Buffer body, Runnable ack)
    {
        System.out.println("Entered Onpublish");
        try
        {
         System.out.println("received msg:" + msg);
        }
        catch (HikeException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        finally{
            ack.run();
        }
    }

    @Override
    public void onFailure(Throwable value)
    {
        value.printStackTrace();
    }
}

创建连接

private void createConnection(String host, int port,String id, String token) throws Exception
{
    this.disconnect();
    MQTT mqtt = new MQTT();
    mqtt.setHost(host, port);
    mqtt.setUserName(id);
    mqtt.setPassword(token);
    CallbackConnection callbackConnection = null;
    callbackConnection = mqtt.callbackConnection();
    callbackConnection.listener(new MyListener());
    callbackConnection.connect(new MyCallback<Void>("CONNECT"));
    callbackConnection.subscribe(new Topic[] { new Topic(uid + "/u", QoS.AT_MOST_ONCE) }, new MyCallback<byte[]>("EVENT SUBSCRIBE"));
    callbackConnection.subscribe(new Topic[] { new Topic(uid + "/s", QoS.AT_LEAST_ONCE), new Topic(uid + "/a", QoS.AT_LEAST_ONCE) }, new MyCallback<byte[]>("MSG SUBSCRIBE"));
    this.callbackConnection = callbackConnection;
}

我的回调

class MyCallback<T> implements Callback<T>
{
    public MyCallback(String tag)
    {
        super();
        this.tag = tag;
    }
    String tag;
    @Override
    public void onSuccess(T value)
    {
        System.out.println("TAG:" + tag + " =SUCCESS value=" + value);
    }
    @Override
    public void onFailure(Throwable value)
    {
        System.out.println("TAG:" + tag + "Fail");
        value.printStackTrace();
    }
}

我的问题是如何实现与服务器逻辑的 mqtt 重新连接?如果我应该使用 onDisconnect() 方法,那么我如何使用它?

这是我如何在连接丢失时实现 Mqtt 重新连接,启动一个线程以尝试连接到 MqttServer,该线程将在成功连接后被销毁。

  boolean retrying = false;
   public void reConnect(){
        if (!retrying) {
            retrying = true;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for (;;) {
                        try {
                            if (isInetAvailable() && !mqttClient.isConnected()) {
                                if(isPasswdProtected) {
                                     //connect with MqttConnectionOptions
                                    Connect_with_passwd();
                                } else {
                                    Connect();
                                }
                                Thread.sleep(MQTT_RETRY_INTERVAL);
                            } else if (isConnected()) {
                                List<String> topics = topicsSubscribed;
                                topicsSubscribed.clear();
                                for (String topic : topics) {
                                    try {
                                        subscribeToTopic(topic);
                                    } catch (MqttException e) {
                                    }
                                }
                                retrying = false;
                                break;
                            } else if (!Internet.isAvailable()) {
                                Thread.sleep(INET_RETRY_INTERVEL);
                            }
                        } catch (MqttException | InterruptedException e) {
                            try {
                                Thread.sleep(MQTT_RETRY_INTERVAL);
                            } catch (InterruptedException ex) {
                            }
                        }
                    }
                }
            }).start();
        }
}
 /*Check internet connection*/
 public static boolean isInetAvailable() {
    boolean connectivity;
    try {
        URL url = new URL(GOOGLE);
        URLConnection conn = url.openConnection();
        conn.connect();
        connectivity = true;
    } catch (IOException e) {
        connectivity = false;
    }
    return connectivity;
}

我已经以这种方式实现了

        //check when network connectivity is back and implement the connection logic again
        System.out.println("Connection Lostn trying to re-connect");
        int tries=0;
        while(true){
            Thread.sleep(MQTT_RETRY_INTERVAL);
            if(checkIfNetworkAvailable()&& !MQTTClient.getInstance().mqttClient.isConnected()){
                try{
                    tries++;
                MQTTClient.getInstance().mqttClient.connect(MachineDetails.getInstance().getMACDetails(), true, (short) 1000);
                //register handler
                MQTTClient.getInstance().mqttClient.registerAdvancedHandler(ApplicationPublishHandler.getInstance());
                String[] topics={Constants.PUBLIC_BROADCAST_TOPIC};
                int[] qos={1};
                MQTTClient.getInstance().mqttClient.subscribe(topics, qos);
                }catch(Exception e){
                    //Service down  and give an alert
//                  break;
                }
                if(tries>No of retries on network available)
                break;
            }
        }

    private boolean checkIfNetworkAvailable() {
        try {
            InetAddress.getByName("<<your host name>>");
            return true;
        } catch (UnknownHostException e) {
            return false;
        }

    }

最新更新