如何检查是否失去与MQTT经纪人的联系



我正在与paho pkg尝试建立Golang的MQTT Sub Client,当经纪人断开连接时,我对客户有问题,我认为应该丢失信息,但这不是发生,如果我启动经纪人,MQTT子客户端无法获得MQTT Pub客户端发送的消息。

为什么会发生这种情况以及如何解决?

代码

package main
import (
    "fmt"
    "os"
    mqtt "github.com/eclipse/paho.mqtt.golang"
)
var (
    broker                     = "tcp://localhost:1883"
    f      mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
        fmt.Printf("TOPIC: %sn", msg.Topic())
        fmt.Printf("MSG: %sn", msg.Payload())
    }
)
func main() {
    //create a ClientOptions
    opts := mqtt.NewClientOptions().AddBroker(broker)
    opts.SetClientID("group-one")
    opts.SetDefaultPublishHandler(f)
    //create and start a client using the above ClientOptions
    c := mqtt.NewClient(opts)
    if token := c.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }
    if token := c.Subscribe("test", 0, nil); token.Wait() && token.Error() != nil {
        fmt.Println(token.Error())
        os.Exit(1)
    }
    for {
    }
}

分配自定义OnConnectionLostHandler以捕获连接丢失的事件,因此您可以在客户端丢失连接时执行其他操作。如果将AutoReconnect选项设置为true(这是默认行为(,则客户端将在连接丢失后自动重新连接到代理。请注意,连接丢失后,您的订阅状态/信息可能无法由经纪人保存,因此您将无法收到任何消息。要处理此问题,请将主题订阅移至OnConnect处理程序。以下是实现的示例:

package main
import (
    "fmt"
    "os"
    "time"
    mqtt "github.com/eclipse/paho.mqtt.golang"
)
func messageHandler(c mqtt.Client, msg mqtt.Message) {
    fmt.Printf("TOPIC: %sn", msg.Topic())
    fmt.Printf("MSG: %sn", msg.Payload())
}
func connLostHandler(c mqtt.Client, err error) {
    fmt.Printf("Connection lost, reason: %vn", err)
    //Perform additional action...
}
func main() {
    //create a ClientOptions
    opts := mqtt.NewClientOptions().
        AddBroker("tcp://localhost:1883").
        SetClientID("group-one").
        SetDefaultPublishHandler(messageHandler).
        SetConnectionLostHandler(connLostHandler)
    //set OnConnect handler as anonymous function
    //after connected, subscribe to topic
    opts.OnConnect = func(c mqtt.Client) {
        fmt.Printf("Client connected, subscribing to: test/topicn")
        //Subscribe here, otherwise after connection lost, 
        //you may not receive any message
        if token := c.Subscribe("test/topic", 0, nil); token.Wait() && token.Error() != nil {
            fmt.Println(token.Error())
            os.Exit(1)
        }
    }
    //create and start a client using the above ClientOptions
    c := mqtt.NewClient(opts)
    if token := c.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }
    for {
        //Lazy...
        time.Sleep(500 * time.Millisecond)
    }
}

最新更新