使用 MQTT Python 双向发布和订阅

  • 本文关键字:MQTT Python 使用 python mqtt
  • 更新时间 :
  • 英文 :


我目前在Raspberry Pi 3上编写了一个Python程序,用于读取湿度和温度传感器数据并将这些数据发布到主题中。然后,我可以使用笔记本电脑接收此数据。这是我用于读取传感器数据并将其发布到树莓派主题的代码:

import RPi.GPIO as GPIO
import time
import json
import Adafruit_DHT as dht
import math
import paho.mqtt.publish as publish
import paho.mqtt.client as mqtt
# Creating the JSON Objects
dht22 = {}
arduino = {}
dht22Temp = []
dht22Hum = []
arduinoLED = []

dht22['temperature'] = dht22Temp
dht22['humidity'] = dht22Hum
dht22['sensor'] = 'DHT22'
arduino['blink'] = arduinoLED
arduino['actuator'] = 'arduinoLED'  
# Timing constants
E_PULSE = 0.0005
E_DELAY = 0.0005
def main():
# Main program block
while True:
h, t = dht.read_retry(dht.DHT22, 17) //Reading humidity and temp data from GPIO17
t = round(t,2)
h = round(h,2)
if t > 25: 
if len(arduinoLED) == 3:
arduinoLED.pop(0)
arduinoLED.append("true")
else: 
arduinoLED.append("true")
else:
if len(arduinoLED) == 3:
arduinoLED.pop(0)
arduinoLED.append("false")
else: 
arduinoLED.append("false")
if len(dht22Temp) == 3:
dht22Temp.pop(0)
dht22Temp.append(t)
else: 
dht22Temp.append(t)
if len(dht22Hum) == 3:
dht22Hum.pop(0)
dht22Hum.append(h)
else: 
dht22Hum.append(h)
# lm35dzTemp.append(tempc) 

# Publishing sensor information by JSON converting object to a string
publish.single("topic/sensorTemperature", json.dumps(dht22), hostname = "test.mosquitto.org")
publish.single("topic/sensorTemperature", json.dumps(arduino), hostname = "test.mosquitto.org")
# Printing JSON objects
print(dht22)
print(arduino)
time.sleep(2)
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
pass
finally:
GPIO.cleanup()

这是我从笔记本电脑订阅和接收数据的代码:

import paho.mqtt.client as mqtt
import json
# This is the Subscriber
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
client.subscribe("topic/sensorTemperature")

def on_message(client, userdata, msg):
print(json.loads(msg.payload)) #converting the string back to a JSON object
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("test.mosquitto.org", 1883, 60)
client.loop_forever()

我现在要做的是从我的笔记本电脑发布一些东西(也许在与订阅者相同的代码中,或者在单独的文件中,该文件只会发布对同一主题的消息 -"topic/sensorTemperature"(。但我的问题是:我如何在我的 Raspberry Pi 上发布和订阅消息(在我发布的第一个代码中(?由于我正在无限循环中将消息发布到我的笔记本电脑,因此我还需要一个无限循环来订阅相同(或不同的主题(以接收消息。如何同时运行其中两个循环?我需要两个不同的线程吗?

谢谢。

正如 Sergey 所建议的,您可以使用loop_start创建一个单独的线程来接收消息。

以下是您的主函数的外观:

def main():
# Create a new client for receiving messages
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.subscribe(topic)
client.connect(mqttserver)
client.loop_start()
while True:
#code for publishing
pass

最简单的方法是在 Raspberry 上并行启动另一个 Python 进程(类似于笔记本电脑的脚本(,处理从笔记本电脑接收的消息。

但是,如果要在一个脚本中实现所有内容,则可以使用第一个片段的实现(发布传感器数据(来扩展第二个代码片段(处理消息(。

当然,在这种情况下不能使用loop_forever((。当你调用loop_forever((时,它永远不会返回,直到客户端调用disconnect((,所以你无法处理收到的消息(主线程被阻止(。Paho 客户端还具有例程loop(( 和 loop_start((/loop_stop((来控制网络循环。 看看它们:

1( 函数loop((可以将超时作为参数。它将阻止,直到新消息到达或时间结束。在第一种情况下 - 处理收到的消息并计算下一次发布之前的时间。再次将此时间作为参数传递给 loop((。在第二种情况下,只需发布数据并调用 loop(( 直到下一次发布(示例中为 2 秒(。

2( loop_start((/loop_stop(( 启动和停止后台线程为您发送和接收(和处理(数据。创建客户端,注册on_message(( 回调,连接/订阅并调用 loop_start(( 来启动此线程。主线程现在对您是免费的 - 将其与第一个片段的逻辑一起使用(循环休眠 2 秒(。

只需在while True:之前将subscribing script中的代码放入publishing script,并将loop_forever()替换为loop_start()。在脚本退出时GPIO.cleanup()使用loop_stop()

最新更新