MQTT paho-发布消息失败时无错误



我正在使用paho.mqtt.golang库连接到代理并发布消息。

它运行良好,只是发布失败时我没有出现错误。

我正在做的测试如下:

  • 我启动了经纪人
  • 我运行代码以连接到代理。连接后,代码等待输入以继续发布
  • 我杀了经纪人
  • 我按enter键继续发布消息

我希望发布函数if token.Error() != nil {...}返回的令牌出现错误,但我没有得到任何错误。

以下是发布函数的代码:

func (handle Handler) Pub(ctx context.Context, topic, payload string, qos int, retained bool) error {
token := handle.client.Publish(topic, byte(qos), retained, payload)
go func(ctx context.Context) {
log := logger.NewLogFromCtx(ctx)
log.Debug("waiting for transaction to complete...")
_ = token.Done()
log.Debug("transaction Done!", zap.Any("token.Error()", token.Error()))
if token.Error() != nil {
log.Error("failed to publish MQTT message", zap.Error(token.Error()))
}
}(ctx)
log := logger.NewLogFromCtx(ctx)
log.Debug("Msg sent !")
return nil
}

这是日志:

Thu 27 May 17:40:25 CEST        INFO    logger/logging.go:32    logger initialized in development mode
[DEBUG][client]   Connect()
[DEBUG][store]    memorystore initialized
[DEBUG][client]   about to write new connect msg
[DEBUG][client]   socket connected to broker
[DEBUG][client]   Using MQTT 3.1.1 protocol
[DEBUG][net]      connect started
[DEBUG][net]      received connack
[DEBUG][client]   startCommsWorkers called
[DEBUG][client]   client is connected/reconnected
[DEBUG][net]      incoming started
[DEBUG][net]      startIncomingComms started
[DEBUG][net]      outgoing started
[DEBUG][net]      startComms started
[DEBUG][client]   startCommsWorkers done
[WARN][store]    memorystore wiped
[DEBUG][client]   exit startClient
Thu 27 May 17:40:25 CEST        INFO    mqtt_/client.go:68      successfully connected to MQTT broker   {"url": "tcp://127.0.0.1:1883", "in": "41.843622ms"}
press enter to publish...

此时,我已连接到代理,代码正在等待输入;我杀了经纪人:

[ERROR][client]   Connect comms goroutine - error triggered EOF
[DEBUG][client]   internalConnLost called
[DEBUG][client]   stopCommsWorkers called
[DEBUG][router]   matchAndDispatch exiting
[DEBUG][pinger]   keepalive stopped
[DEBUG][client]   startCommsWorkers output redirector finished
[DEBUG][net]      outgoing waiting for an outbound message
[DEBUG][net]      outgoing waiting for an outbound message
[DEBUG][net]      outgoing comms stopping
[DEBUG][net]      startComms closing outError
[DEBUG][client]   incoming comms goroutine done
[DEBUG][client]   stopCommsWorkers waiting for workers
[DEBUG][client]   stopCommsWorkers waiting for comms
[DEBUG][client]   stopCommsWorkers done
[DEBUG][client]   internalConnLost waiting on workers
[DEBUG][client]   internalConnLost workers stopped
[DEBUG][client]   internalConnLost complete
[DEBUG]Connection lost: EOF
[DEBUG][client]   enter reconnect
[DEBUG][client]   about to write new connect msg
[DEBUG][client]   socket connected to broker
[DEBUG][client]   Using MQTT 3.1.1 protocol
[DEBUG][net]      connect started
[ERROR][net]      connect got error EOF
[ERROR][client]   Connecting to tcp://127.0.0.1:1883 CONNACK was not CONN_ACCEPTED, but rather Connection Error
[DEBUG][client]   Reconnect failed, sleeping for 1 seconds: network Error : EOF

这种联系确实已经失去了。我按下回车键继续发布:

[DEBUG][client]   enter Publish
[DEBUG][client]   storing publish message (reconnecting), topic: just/for/test
Thu 27 May 17:40:42 CEST        DEBUG   mqtt_/client.go:84      Msg sent !
Thu 27 May 17:40:42 CEST        DEBUG   mqtt_/client.go:76      waiting for transaction to complete...
Thu 27 May 17:40:42 CEST        DEBUG   mqtt_/client.go:78      transaction Done!       {"token.Error()": null}

代币里什么都没有。错误((。如何检查发布是否成功?


如果您需要更多详细信息,这是我的完整代码。

连接并发布到代理:

type Handler struct {
client MQTT.Client
conf   config.Configuration
}
func InitMQTT() {
MQTT.DEBUG = lg.New(os.Stdout, "[DEBUG]", 0)
MQTT.WARN = lg.New(os.Stdout, "[WARN]", 0)
MQTT.CRITICAL = lg.New(os.Stdout, "[CRIT]", 0)
MQTT.ERROR = lg.New(os.Stdout, "[ERROR]", 0)
}
func NewClient(ctx context.Context, conf config.Configuration) (Handler, error) {
start := time.Now()
log := logger.NewLogFromCtx(ctx)
brokerUrl := fmt.Sprintf("tcp://%s:%s", conf.GW_MQTT_BROKER_HOST_IP, conf.GW_MQTT_BROKER_PORT)
hostname, _ := os.Hostname()
clientId := hostname + strconv.Itoa(time.Now().Second())
connOpts := MQTT.NewClientOptions()
connOpts.AddBroker(brokerUrl)
connOpts.SetClientID(clientId)
connOpts.SetCleanSession(true)
handler := Handler{conf: conf}
handler.client = MQTT.NewClient(connOpts)
if token := handler.client.Connect(); token.Wait() && token.Error() != nil {
log.Error("failed to connect to MQTT broker", zap.Error(token.Error()))
return Handler{}, token.Error()
}
log.Info("successfully connected to MQTT broker", zap.String("url", brokerUrl), zap.Duration("in", time.Since(start)))
return handler, nil
}
func (handle Handler) Pub(ctx context.Context, topic, payload string, qos int, retained bool) error {
token := handle.client.Publish(topic, byte(qos), retained, payload)
go func(ctx context.Context) {
log := logger.NewLogFromCtx(ctx)
log.Debug("waiting for transaction to complete...")
_ = token.Done()
log.Debug("transaction Done!", zap.Any("token.Error", token.Error()))
if token.Error() != nil {
log.Error("failed to publish MQTT message", zap.Error(token.Error()))
}
}(ctx)
log := logger.NewLogFromCtx(ctx)
log.Debug("Msg sent !")
return nil
}

主要内容如下:

func main() {
conf := config.GetConfig()
err := logger.SetupLogging(conf.IS_DEV_ENV)
if err != nil {
panic(err)
}
ctx := context.Background()
log := logger.NewLogFromCtx(ctx)
mqtt.InitMQTT()
mqttClient, _ := mqtt.NewClient(ctx, conf)
reader := bufio.NewReader(os.Stdin)
fmt.Print("press enter to publish...")
text, _ := reader.ReadString('n')
mqttClient.Pub(ctx, "just/for/test", "test", 2, false)
}

来自文档:

// Wait will wait indefinitely for the Token to complete, ie the Publish
// to be sent and confirmed receipt from the broker.
Wait() bool
// Done is provided for use in select statements. Simple use cases may
// use Wait or WaitTimeout.
Done() <-chan struct{}

所以_ = token.Done()真的什么都不做;最简单的等待就是使用CCD_ 3。如果您想使用token.Done(),您需要等待返回的通道;例如CCD_ 5。Done()存在的原因是,当您等待多个事件时(例如,等待context或mqtt操作完成(,事情会变得更简单。

相关内容

  • 没有找到相关文章

最新更新