使用单个连接实例实现Eclipse MQTT Android客户端



我在应用程序中使用Eclipse Paho Android MQTT服务。我能够订阅并将消息发布给MQTT经纪人。我在应用程序中有几个活动,当启动任何活动时,它都会使用mqttAndroidClient.connect(null, new IMqttActionListener() {}连接到经纪人并在mqttAndroidClient.setCallback(new MqttCallback() {}中获取响应。

我的问题:

  1. 这是实施Android MQTT服务的正确方法吗?
  2. 是否有一种方法可以在整个应用程序中使用相同的连接和回调实例?

a'更好的'方法是创建一个连接/重新连接到MQTT代理的Service

我创建了自己的名为MqttConnectionManagerService的服务,该服务维护并管理与经纪人的连接。

此解决方案的关键特征:

  1. 服务只要活着,就可以维护一个实例。
  2. 如果服务被杀死,Android会重新启动它(因为START_STICKY
  3. 设备靴时可以启动服务。
  4. 服务在后台运行,并始终连接以接收通知。
  5. 如果服务还活着,则再次调用startService(..)将触发其onStartCommand()方法(而不是onCreate())。在此方法中,我们只需检查该客户端是否已连接到经纪人并在需要时连接/重新连接。

示例代码:

mqttConnectionManagerService

public class MqttConnectionManagerService extends Service {
    private MqttAndroidClient client;
    private MqttConnectOptions options;
    @Override
    public void onCreate() {
        super.onCreate();
        options = createMqttConnectOptions();
        client = createMqttAndroidClient();
    }

    @Override
    public int onStartCommand(Intent intent, int flags, int startId) {
        this.connect(client, options);
        return START_STICKY;
    }
    private MqttConnectOptions createMqttConnectOptions() {
        //create and return options
    }
    private MqttAndroidClient createMqttAndroidClient() {
        //create and return client
    }
    public void connect(final MqttAndroidClient client, MqttConnectOptions options) {
        try {
            if (!client.isConnected()) {
                IMqttToken token = client.connect(options);
                //on successful connection, publish or subscribe as usual
                token.setActionCallback(new IMqttActionListener() {..});
                client.setCallback(new MqttCallback() {..});
            }
        } catch (MqttException e) {
            //handle e
        }
    }
}

androidmanifest.xml

<?xml version="1.0" encoding="utf-8"?>
<manifest xmlns:android="http://schemas.android.com/apk/res/android"
    package="...">
    <!-- Permissions required to receive BOOT_COMPLETED event -->
    <uses-permission android:name="android.permission.RECEIVE_BOOT_COMPLETED" />
    <application
        android:allowBackup="true"
        android:icon="@mipmap/ic_launcher"
        android:label="@string/app_name"
        android:supportsRtl="true"
        android:theme="@style/AppTheme">
        <!-- activities go here -->
        <!-- BroadcastReceiver that starts MqttConnectionManagerService on device boot -->
        <receiver android:name=".MqttServiceStartReceiver">
            <intent-filter>
                <action android:name="android.intent.action.BOOT_COMPLETED" />
            </intent-filter>
        </receiver>
        <!-- Services required for using MQTT -->
        <service android:name="org.eclipse.paho.android.service.MqttService" />
        <service android:name=".MqttConnectionManagerService" />
    </application>
</manifest>

mqttservicestartreceiver

public class MqttServiceStartReceiver extends BroadcastReceiver {    
    @Override
    public void onReceive(Context context, Intent intent) {
        context.startService(new Intent(context, MqttConnectionManagerService.class));
    }
}

在您的活动的onResume()

startService(new Intent(this, MqttConnectionManagerService.class));

这是我对MQTT客户端的Singleton实现:

    public class MQTTConnection extends ServerConnectionImpl { 
    private static String TAG = MQTTConnection.class.getSimpleName();
    private static Context mContext;
    private static MqttAndroidClient mqttAndroidClient;
    private static String clientId;
    private static MQTTConnection sMqttConnection = null;
    private MQTTConnection() {
    }
    public static MQTTConnection getInstance(Context context) {
        if (null == sMqttConnection) {
            mContext = context;
            init();
        }
        return sMqttConnection;
    }
    public static void reconnectToBroker() {
        try {
            if (sMqttConnection != null) {
                sMqttConnection.disconnect();
            }
            init();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    private static void init() {
        sMqttConnection = new MQTTConnection();
        setClientId();
        connectToBroker();
    }
    private static void connectToBroker() {
        String ip = STBPreferences.getInstance(mContext).getString(Constants.KEY_MQTT_SERVER_IP, null);
        if (ip == null) {
            ip = Constants.MQTT_SERVER_IP;
        }
        final String uri = Constants.MQTT_URI_PREFIX + ip + ":" + Constants.MQTT_SERVER_PORT;
        mqttAndroidClient = new MqttAndroidClient(mContext.getApplicationContext(), uri, clientId);
        mqttAndroidClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                if (reconnect) {
                    LogUtil.d(TAG, "Reconnected to : " + serverURI);
                    // Because Clean Session is true, we need to re-subscribe
                    subscribeToTopic();
                } else {
                    LogUtil.d(TAG, "Connected to: " + serverURI);
                }
            }
            @Override
            public void connectionLost(Throwable cause) {
                LogUtil.d(TAG, "The Connection was lost.");
            }
            @Override
            public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
                String messageReceived = new String(mqttMessage.getPayload());
                LogUtil.d(TAG, "Incoming message: " + messageReceived);
                try {
                    Gson gson = new Gson();
                    Message message = gson.fromJson(messageReceived, Message.class);
                    // Here you can send message to listeners for processing
                } catch (JsonSyntaxException e) {
                    // Something wrong with message format json
                    e.printStackTrace();
                }
            }
            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                LogUtil.d(TAG, "Message delivered");
            }
        });
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setCleanSession(false);
        try {
            mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    LogUtil.d(TAG, "connect onSuccess");
                    DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions();
                    disconnectedBufferOptions.setBufferEnabled(true);
                    disconnectedBufferOptions.setBufferSize(100);
                    disconnectedBufferOptions.setPersistBuffer(false);
                    disconnectedBufferOptions.setDeleteOldestMessages(false);
                    mqttAndroidClient.setBufferOpts(disconnectedBufferOptions);
                    subscribeToTopic();
                }
                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    LogUtil.d(TAG, "Failed to connect to: " + uri);
                }
            });

        } catch (MqttException ex){
            ex.printStackTrace();
        }
    }
    public void publish(Message publishMessage) {
        try {
            Gson gson = new Gson();
            String replyJson = gson.toJson(publishMessage);
            String publishTopic = clientId + Constants.MQTT_PUB_TOPIC_APPEND;
            MqttMessage message = new MqttMessage();
            message.setPayload(replyJson.getBytes());
            mqttAndroidClient.publish(publishTopic, message);
            LogUtil.d(TAG, "Message Published");
            /*if(!mqttAndroidClient.isConnected()){
                LogUtil.d(TAG, mqttAndroidClient.getBufferedMessageCount() + " messages in buffer.");
            }*/
        } catch (MqttException e) {
            LogUtil.d(TAG, "Error Publishing: " + e.getMessage());
            e.printStackTrace();
        } catch (NullPointerException e) {
            e.printStackTrace();
            if (mqttAndroidClient == null) {
                init();
            }
        }
    }
    private static void subscribeToTopic() {
        try {
            String subscriptionTopic = clientId + Constants.MQTT_SUB_TOPIC_APPEND;
            mqttAndroidClient.subscribe(subscriptionTopic, 0, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    LogUtil.d(TAG, "subscribe onSuccess");
                }
                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    LogUtil.d(TAG, "Failed to subscribe");
                }
            });
        } catch (MqttException ex){
            System.err.println("Exception whilst subscribing");
            ex.printStackTrace();
        }
    }
    public void unSubscribe() {
        LogUtil.d(TAG, "unSubscribe");
        final String topic = "foo/bar";
        try {
            IMqttToken unsubToken = mqttAndroidClient.unsubscribe(topic);
            unsubToken.setActionCallback(new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    // The subscription could successfully be removed from the client
                    LogUtil.d(TAG, "unSubscribe onSuccess");
                }
                @Override
                public void onFailure(IMqttToken asyncActionToken,
                                      Throwable exception) {
                    LogUtil.d(TAG, "unSubscribe onFailure");
                    // some error occurred, this is very unlikely as even if the client
                    // did not had a subscription to the topic the unsubscribe action
                    // will be successfully
                }
            });
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    public void disconnect() {
        LogUtil.d(TAG, "disconnect");
        try {
            IMqttToken disconToken = mqttAndroidClient.disconnect();
            disconToken.setActionCallback(new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    // we are now successfully disconnected
                    LogUtil.d(TAG, "disconnect onSuccess");
                }
                @Override
                public void onFailure(IMqttToken asyncActionToken,
                                      Throwable exception) {
                    LogUtil.d(TAG, "disconnect onFailure");
                    // something went wrong, but probably we are disconnected anyway
                }
            });
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    private static void setClientId() {
        String srNo = STBPreferences.getInstance(mContext).getString(Constants.STB_SERIAL_NO, null);
        clientId = srNo;
    }
    private String getClientId() {
        if (clientId == null) {
            setClientId();
        }
        return clientId;
    }
    @Override
    public boolean isInternetEnabled() {
        return NetworkUtility.isNetworkAvailable(mContext);
    }
    @Override
    public void sendMessage(Message message) {
        publish(message);
    }
    @Override
    public void reconnect() {
        reconnectToBroker();
    }
}

这是消息模型。为您的需要更改模型类。

public class Message {
    /**
     * Type of data
     */
    @SerializedName("type")
    private String type;
    /**
     * Name of component
     */
    @SerializedName("name")
    private String name;
    /**
     * Data in text format
     */
    @Expose
    @SerializedName("data")
    private Object data;
    public Message(String type, String name, Object data) {
        this.type = type;
        this.name = name;
        this.data = data;
    }
    public String getType() {
        return type;
    }
    public void setType(String type) {
        this.type = type;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public Object getData() {
        return data;
    }
    public void setData(Object data) {
        this.data = data;
    }
    @Override
    public String toString() {
        return "Message{" +
                "type=" + type + "n" +
                "name=" + name + "n" +
                "data=" + data.toString() +
                '}';
    }
}

在您的活动中获取MQTT实例

MQTTConnection mqttConnection = HTTPConnection.getInstance(mContext);

发布消息

mqttConnectin.sendMessage(new Message( ... ));

编辑1:这是我的参考的ServerConnectionImpl类。

public class ServerConnectionImpl extends ConfigurationChangeListenerImpl implements ServerConnection {
/**
 * Logging TAG
 */
private static final String TAG = ServerConnectionImpl.class.getSimpleName();
/**
 * List of all listener which are registered for messages received
 */
private static ArrayList<ConfigurationChangeListenerImpl> sConfigurationChangeListeners = new ArrayList<>();
@Override
public boolean isInternetEnabled() {
    return false;
}
@Override
public ResponseData getSubscriptionDetails(String serialNumber) {
    return null;
}
@Override
public void sendMessage(Message message, WebSocket webSocket) {
}
@Override
public void sendMessage(Message message) {
}
@Override
public void sendMessageToAll(Message message) {
}
//@Override
public static void notifyListeners(int config, Message message, WebSocket wc) {
    switch (config) {
        case Configs.CAMERA: {
            for (ConfigurationChangeListenerImpl l : sConfigurationChangeListeners) {
                l.onCameraServerChanged();
            }
            break;
        }
        case Configs.GESTURE: {
            for (ConfigurationChangeListenerImpl l : sConfigurationChangeListeners) {
                l.onGestureCommandServerChanged();
            }
            break;
        }
        case Configs.MOTION_SENSOR: {
            for (ConfigurationChangeListenerImpl l : sConfigurationChangeListeners) {
                l.onMotionSensorServerChanged();
            }
            break;
        }
        case Configs.MESSAGE: {
            for (ConfigurationChangeListenerImpl l : sConfigurationChangeListeners) {
                l.onMessageReceived(message, wc);
            }
            break;
        }
    }
}
/**
 * Adds listener to listen to messages.
 *
 * @param listener
 */
@Override
public synchronized void addListener(ConfigurationChangeListenerImpl listener) {
    LogUtil.d(TAG, "addListener()");
    if (listener == null) {
        throw new IllegalArgumentException("Invalid listener " + listener);
    }
    sConfigurationChangeListeners.add(listener);
}
/**
 * Removes the listener
 *
 * @param listener
 */
@Override
public synchronized void removeListener(ConfigurationChangeListenerImpl listener) {
    LogUtil.d(TAG, "removeListener()");
    if (listener == null) {
        throw new IllegalArgumentException("Invalid listener " + listener);
    }
    sConfigurationChangeListeners.remove(listener);
}
@Override
public void updateState() {
}
@Override
public void reconnect() {
}

}

您可以将自己的实现用于ServerConnectionImpl类。

最新更新