Google IoT-接收通知的正确模式(订阅工作)



我正在遵循本教程,并且我已经将代码发布消息到/devices/sm1/events主题,其中sm1是我的设备ID。

我想知道如何订阅此主题,因为教程说使用/devices/sm1/config,但我收到空的消息。我已经尝试使用出版(/devices/sm1/events(中使用的相同的"路径",但它也不起作用。

奇怪的是,我给主题的名字是sm1,与我的设备相关的主题是在Googleiot控制台上显示为projects/myprojectname/topics/sm1。因此,除了发现如何订阅上述主题外,我还感谢与GoogleIot中使用Pub/sub主题的正确方法有关的任何解释(文档尚不清楚(。

这是我的subscribe.py

mqtt_url = "mqtt.googleapis.com"
mqtt_port = 8883
topic = "/devices/sm1/config"
def on_connect(client, userdata, flags, response_code):
    print("Connected with status: {0}".format(response_code))
    client.subscribe(topic, 1)
def on_message(client, userdata, msg):
    print("Topic: {0}  --  Payload: {1}".format(msg.topic, msg.payload))
if __name__ == "__main__":    
    client = mqtt.Client("projects/{}/locations/{}/registries/{}/devices/{}".format(
                         project_id,
                         cloud_region,
                         registry_id,
                         device_id))
    client.username_pw_set(username='unused',
                           password=jwt_maker.create_jwt(project_id,
                                               private_key,
                                               algorithm="RS256"))
    client.tls_set(root_ca,
                   certfile = public_crt,
                   keyfile = private_key,
                   cert_reqs = ssl.CERT_REQUIRED,
                   tls_version = ssl.PROTOCOL_TLSv1_2,
                   ciphers = None)

    client.on_connect = on_connect
    client.on_message = on_message
    print "Connecting to Google IoT Broker..."
    client.connect(mqtt_url, mqtt_port, keepalive=60)
    client.loop_forever()

我的输出:

与状态连接:0
主题:/设备/sm1/config-有效载荷:
主题:/设备/SM1/配置 - 有效载荷:

在阅读了@gabeweiss的评论部分中的讨论后,我认为您想要实现的目标和如何实现会有一些混乱(或什么(您要使用pub/sub。

鉴于我认为这里的问题更概念化,让我首先将您推荐给有关Cloud IoT核心密钥概念的通用文档页面,您实际上会在其中找到有关Cloud Iot Core和Pub/sub之间关系的一些信息。总而言之,设备遥测数据已发布到云物联网核心主题中,后来通过数据经纪发表到云酒吧/子主题中,您可以在其他目的上使用该主题:触发云功能,分析云功能,分析的流。带有数据流等的数据

现在,如果您遵循云物联网核心的快速启动指南,您将在给定点创建一个设备注册表,该设备注册表绑定到一个设备遥测事件的pub/sub主题。如果您希望写入多个主题,而不是写信给默认的酒吧/子主题,则可以按照文档的其他部分进行说明。

最后,解决订阅主题的问题(云物联网核心主题,而不是酒吧/子主题,因为只有前者与设备相关(,您可以使用以下命令订阅MQTT主题,其中(例如(,该设备订阅了配置主题,其中配置更新已发布:

# This is the topic that the device will receive configuration updates on.
mqtt_config_topic = '/devices/{}/config'.format(device_id)
# Subscribe to the config topic.
client.subscribe(mqtt_config_topic, qos=1)

然后,使用on_message()功能,您可以处理在您实际订阅的主题上发布的消息。请注意,有效载荷必须解析为字符串(str(message.payload)(。

def on_message(unused_client, unused_userdata, message):
    payload = str(message.payload)
    print('Received message '{}' on topic '{}' with Qos {}'.format(
            payload, message.topic, str(message.qos)))

然后,在您的问题中,您说您首先订阅了/devices/{device-id}/config,但这可能不是您想要的,因为这是主题是配置更新(即不是发布遥测事件的地方(。然后,我知道您应该订阅/devices/{device-id}/events,这是遥测指标的实际MQTT主题。如果这不起作用,可能会有另一个问题,因此请确保您正确解析message变量,并尝试将pub/sub与使用注册表创建的默认主题一起使用,以检查遥测指标是否正在正确发表与否。

edit :根据下面的评论澄清...

这里有两个GCP组件。有MQTT主题(这是/事件主题(,该设备将使用该主题与IoT Core交谈。然后是IoT核心中不在IoT核心中的projects/myprojectname/topics/sm1,它在Pub/sub中。当您将消息发送到/事件MQTT主题时,IoT Core Brokers从您的设备中发送的有效载荷发送到/事件MQTT主题,直到创建并将其附加到IoT Core注册中的酒吧/sub主题中。

要查看这些消息,您必须在pub/sub中对主题projects/myprojectname/topics/sm1创建订阅。如果您转到控制台,以及Pub/sub->主题。单击主题旁边的三个点,然后选择"新订阅"。订阅后,您可以从设备发送一些数据,然后在命令行上运行(假设已安装了GCLOUD工具(:

gcloud beta pubsub subscriptions pull --max-messages=3 <subscription_id>

要对消息进行任何操作,您可以脚本订阅pub/sub主题(查看pub/sub apis(,以触发添加到主题中的消息。

原始消息

您是否向设备发送配置消息?混乱可能是MQTT主题是单向的。

so:1(/事件主题适用于设备 -> iot core。2(/config主题适用于IoT Core Admin SDK->设备

在另一个地方的另一个脚本中,或从IoT Core UI界面中,您需要发送配置消息才能正确查看on_message fire。

在IoT Core UI(在Console.cloud.google.com上(,您可以深入到已注册的单个设备,在屏幕顶部,单击" Update Config"。将出现一个弹出窗口,使您可以将文本或base64编码消息发送到该设备,并且它将出现在/config主题上。

我不得不屈服于Google Pub/Sub Library,才能接收与我指定的主题相关的通知。

我的发布代码(仅重要的部分(:

mqtt_url = "mqtt.googleapis.com"
mqtt_port = 8883
mqtt_topic = "/devices/sm1/events"
client = mqtt.Client("projects/{}/locations/{}/registries/{}/devices/{}".format(
                     project_id,
                     cloud_region,
                     registry_id,
                     device_id), protocol=mqtt.MQTTv311)
client.username_pw_set(username='unused',
                       password=jwt_maker.create_jwt(project_id,
                                           private_key,
                                           algorithm="RS256"))
client.tls_set(root_ca, 
               certfile = public_crt, 
               keyfile = private_key, 
               cert_reqs = ssl.CERT_REQUIRED, 
               tls_version = ssl.PROTOCOL_TLSv1_2, 
               ciphers = None)
client.connect(mqtt_url, mqtt_port, keepalive=60)
res = client.publish(mqtt_topic, some_data, qos=1)

在Google Cloud Platform Portal中,我必须在Pub/Sub部分中创建一个订阅,将其分配给我创建的主题,这已经是我的默认主题(可能链接到/事件主题(。创建的订阅具有以下格式:

projects/project_name/subscriptions/subscription_name

使用Google Pub/sub库订阅代码,因为无法使用MQTT协议:

from google.cloud import pubsub
def callback(message):
    print(message.data)
    message.ack()
project_id = "project_name"
subscription_name = "sm1"
subscriber = pubsub.SubscriberClient()
subscription_name = 'projects/{}/subscriptions/{}'.format(project_id, subscription_name)
subscription = subscriber.subscribe(subscription_name)
future = subscription.open(callback)
try:
    future.result()
except Exception as ex:
    subscription.close()
    raise

我希望这可以帮助任何人。可以在此处找到更多详细信息(github(。

最新更新