我开始使用MQTT,并且很难处理一个不可靠的网络。我正在使用Paho Java客户端(Groovy)将消息发布给遥远的蚊子经纪人。
当经纪人无法实现时,有没有办法让Paho客户端坚持消息并自动重新连接到经纪人并发布本地存储的消息?我必须自己处理所有内容,例如使用本地经纪人吗?
这是我的客户端构建代码
String persistenceDir = config['persistence-dir'] ?: System.getProperty('java.io.tmpdir') def persistence = new MqttDefaultFilePersistence(persistenceDir) client = new MqttAsyncClient(uri, clientId, persistence) client.setCallback(this) options = new MqttConnectOptions() if (config.password) { options.setPassword(config.password as char[]) options.setUserName(config.user) } options.setCleanSession(false) client.connect(options)
和我的发布代码
def message = new MqttMessage(Json.encode(outgoingMessage).getBytes()) try { client?.connect(options) def topic = client.getTopic('processMsg') message.setQos(1) def token = topic.publish(message) if (client) { client.disconnect() }
谢谢
paho客户端仅在连接到经纪人时才会持续到飞行中。
通常,当连接性问题开始到达时,您会看到消息超时弹出
- 时机等待服务器的响应(32000)
那时,消息仍将持续存在。
但是,当连接丢失时,您开始看到此
- 客户未连接(32104)
您应该假设该消息尚未由Paho持续。
您可以在org.eclipse.paho.client.mqttv3.internal.ClientComms
中调试此:
/**
* Sends a message to the broker if in connected state, but only waits for the message to be
* stored, before returning.
*/
public void sendNoWait(MqttWireMessage message, MqttToken token) throws MqttException {
final String methodName = "sendNoWait";
if (isConnected() ||
(!isConnected() && message instanceof MqttConnect) ||
(isDisconnecting() && message instanceof MqttDisconnect)) {
this.internalSend(message, token);
} else {
//@TRACE 208=failed: not connected
log.fine(className, methodName, "208");
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
}
}
internalSend
将持久持续此消息,但仅当它连接到经纪人时。
还考虑到PAHO可以处理最大数量的机上消息。如果超出它也将决定不持续消息。
您只需设置本地经纪人,并用远程代理桥接该经纪人。这样,您可以在本地排队所有消息,当远程经纪人返回在线时,所有消息都可以交付。
是...AF89-22D65FFEE070)