谷歌云物联网 - 多个配置消息



在我的代码中,我订阅了 3 个不同的主题:

/devices/{}/config
/devices/{}/events
/devices/{}/state

如果我没有订阅/devices/{}/config,我不会收到任何配置消息,没关系。但是如果我订阅了/devices/{}/config,我每次订阅都会收到一条消息。

例:

订阅了"/devices/{}/config"和"/devices/{}/events",我收到 2 条配置消息。

订阅了"/devices/{}/config"和"/devices/{}/state",我收到 2 条配置消息。

订阅了"/devices/{}/config"、"/devices/{}/state"和"/devices/{}/events",我收到了 3 条配置消息。

订阅了"/devices/{}/events"和"/devices/{}/state",我收到0条配置消息。

这会导致物联网核心错误:mqtt:

无法更新设备"xxxxx"。设备状态只能每 1 秒更新一次。

事实上,我想要并且只需要一条配置消息。我做错了什么?

这是我的代码:

# [START iot_mqtt_includes]
import argparse
import datetime
import os
import random
import ssl
import time
import log
import updateConfig
import jwt
import paho.mqtt.client as mqtt
import payload
# [END iot_mqtt_includes]
# The initial backoff time after a disconnection occurs, in seconds.
minimum_backoff_time = 1
# The maximum backoff time before giving up, in seconds.
MAXIMUM_BACKOFF_TIME = 32
# Whether to wait with exponential backoff before publishing.
should_backoff = False

# [START iot_mqtt_jwt]
def create_jwt(project_id, private_key_file, algorithm):
token = {
# The time that the token was issued at
'iat': datetime.datetime.utcnow(),
# The time the token expires.
'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=60),
# The audience field should always be set to the GCP project id.
'aud': project_id
}
# Read the private key file.
with open(private_key_file, 'r') as f:
private_key = f.read()
f.close()
return jwt.encode(token, private_key, algorithm=algorithm)
# [END iot_mqtt_jwt]

# [START iot_mqtt_config]
def error_str(rc):
"""Convert a Paho error to a human readable string."""
return '{}: {}'.format(rc, mqtt.error_string(rc))

def on_connect(unused_client, unused_userdata, unused_flags, rc):
"""Callback for when a device connects."""
log.append_log('ao_conectar - ' +  mqtt.connack_string(rc))
# After a successful connect, reset backoff time and stop backing off.
global should_backoff
global minimum_backoff_time
should_backoff = False
minimum_backoff_time = 1

def on_disconnect(unused_client, unused_userdata, rc):
"""Paho callback for when a device disconnects."""
append_log('ao_desconectar - '+ error_str(rc))
# Since a disconnect occurred, the next loop iteration will wait with
# exponential backoff.
global should_backoff
should_backoff = True

def on_publish(unused_client, unused_userdata, unused_mid):
"""Paho callback when a message is sent to the broker."""
def on_message(unused_client, unused_userdata, message):
"""Callback when the device receives a message on a subscription."""
config = str(message.payload)
retorno = updateConfig.update(config)
def get_client(
project_id, cloud_region, registry_id, device_id, private_key_file,
algorithm, ca_certs, mqtt_bridge_hostname, mqtt_bridge_port):
"""Create our MQTT client. The client_id is a unique string that identifies
this device. For Google Cloud IoT Core, it must be in the format below."""
client = mqtt.Client(
client_id=('projects/{}/locations/{}/registries/{}/devices/{}'
.format(
project_id,
cloud_region,
registry_id,
device_id)))
# With Google Cloud IoT Core, the username field is ignored, and the
# password field is used to transmit a JWT to authorize the device.
client.username_pw_set(
username='unused',
password=create_jwt(
project_id, private_key_file, algorithm))
# Enable SSL/TLS support.
client.tls_set(ca_certs=ca_certs, tls_version=ssl.PROTOCOL_TLSv1_2)
# Register message callbacks. https://eclipse.org/paho/clients/python/docs/
# describes additional callbacks that Paho supports. In this example, the
# callbacks just print to standard out.
client.on_connect = on_connect
client.on_publish = on_publish
client.on_disconnect = on_disconnect
client.on_message = on_message
# Connect to the Google MQTT bridge.
client.connect(mqtt_bridge_hostname, mqtt_bridge_port)
# This is the topic that the device will receive configuration updates on.
mqtt_config_topic = '/devices/{}/config'.format(device_id)
# Subscribe to the config topic.
client.subscribe(mqtt_config_topic, qos=1)
return client
# [END iot_mqtt_config]

def parse_command_line_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description=(
'Example Google Cloud IoT Core MQTT device connection code.'))
parser.add_argument(
'--project_id',
default=os.environ.get('GOOGLE_CLOUD_PROJECT'),
help='GCP cloud project name')
parser.add_argument(
'--registry_id', required=True, help='Cloud IoT Core registry id')
parser.add_argument(
'--device_id', required=True, help='Cloud IoT Core device id')
parser.add_argument(
'--private_key_file',
required=True, help='Path to private key file.')
parser.add_argument(
'--algorithm',
choices=('RS256', 'ES256'),
required=True,
help='Which encryption algorithm to use to generate the JWT.')
parser.add_argument(
'--cloud_region', default='us-central1', help='GCP cloud region')
parser.add_argument(
'--ca_certs',
default='roots.pem',
help=('CA root from https://pki.google.com/roots.pem'))
parser.add_argument(
'--message_type',
choices=('event', 'state'),
default='event',
help=('Indicates whether the message to be published is a '
'telemetry event or a device state message.'))
parser.add_argument(
'--mqtt_bridge_hostname',
default='mqtt.googleapis.com',
help='MQTT bridge hostname.')
parser.add_argument(
'--mqtt_bridge_port',
choices=(8883, 443),
default=8883,
type=int,
help='MQTT bridge port.')
parser.add_argument(
'--jwt_expires_minutes',
default=20,
type=int,
help=('Expiration time, in minutes, for JWT tokens.'))
return parser.parse_args()

# [START iot_mqtt_run]
def main():
log.append_log("Iniciando uma nova conexao com o Google IoT.")
global minimum_backoff_time
args = parse_command_line_args()
# Publish to the events or state topic based on the flag.
jwt_iat = datetime.datetime.utcnow()
jwt_exp_mins = args.jwt_expires_minutes
client = get_client(
args.project_id, args.cloud_region, args.registry_id, args.device_id,
args.private_key_file, args.algorithm, args.ca_certs,
args.mqtt_bridge_hostname, args.mqtt_bridge_port)
# Publish num_messages mesages to the MQTT bridge once per second.
while True:
# Process network events.
client.loop()
# Wait if backoff is required.
if should_backoff:
# If backoff time is too large, give up.
if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
log.append_log('Tempo maximo de backoff excedido. Desistindo.')
break
# Otherwise, wait and connect again.
delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
log.append_log('Esperando {} segundos antes de reconectar.'.format(delay))
time.sleep(delay)
minimum_backoff_time *= 2
client.connect(args.mqtt_bridge_hostname, args.mqtt_bridge_port)
# [START iot_mqtt_jwt_refresh]
seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds
if seconds_since_issue > 60 * jwt_exp_mins:
log.append_log('Atualizando token de acesso depois de {} segundos'.format(seconds_since_issue))
client.loop_stop()
jwt_iat = datetime.datetime.utcnow()
client = get_client(
args.project_id, args.cloud_region,
args.registry_id, args.device_id, args.private_key_file,
args.algorithm, args.ca_certs, args.mqtt_bridge_hostname,
args.mqtt_bridge_port)
# [END iot_mqtt_jwt_refresh]
# Publish "payload" to the MQTT topic. qos=1 means at least once
# delivery. Cloud IoT Core also supports qos=0 for at most once
# delivery.
payloadToPublish = payload.lerPayload()
if payloadToPublish != 'sem payload':
if payloadToPublish[0] == 'event':
mqtt_topic = '/devices/{}/{}'.format(args.device_id, 'events')
log.append_log('publicando [' + payloadToPublish[1] + ']')
else:
mqtt_topic = '/devices/{}/{}'.format(args.device_id, 'state')
client.publish(mqtt_topic, payloadToPublish[1], qos=1)
# [END iot_mqtt_run]

if __name__ == '__main__':
main()

这是预期的行为:

  • 事件和状态主题用于设备到云的通信,配置主题用于将配置数据发送到 IoT 设备。订阅事件/状态主题实际上是一个NOOP。
  • 云物联网核心版仅支持 QoS 1,即消息传输"至少一次",您可能想尝试 QoS 0,它直到 ACK 才重试消息传输,但我认为这不是您想要的
  • STATE 和 CONFIG 传输限制为每秒一条消息;传输配置更改的速度超过此速度将导致您看到的错误

最新更新