返回Python Paho MQTT中的数据on_message



我正在实现PAHO MQTT客户端。这是我的代码:

import paho.mqtt.client as mqtt

def mess(client, userdata, message):
   print("{'" + str(message.payload) + "', " + str(message.topic) + "}")

def subscribe(c_id, topic, server, port):
   cl = mqtt.Client(c_id)
   cl.connect(server, port)
   cl.subscribe(topic)
   cl.on_message = mess
   cl.loop_forever()

这可以正常工作,但是我不想在"混乱"中打印数据。我需要将print()中的字符串返回到调用功能。我正在从另一个程序中调用subscribe()。任何帮助,直接或建议阅读将不胜感激。

带有所显示的内容,您需要使用 global标志更新函数之外的 data变量。

data = ''
def mess(client, userdata, message):
  global data
  data = "{'" + str(message.payload) + "', " + str(message.topic) + "}"

subscribe函数也将永远不会返回,因为它调用cl.loop_forever()。如果您希望它返回,则应致电cl.loop_start()

subscribe中打印data也无法正常工作,因为客户端在启动网络循环之前实际上无法处理传入消息(打印之后的行)。

也不能保证订阅主题后何时会发出消息。

毫无了解地了解您要实现的目标,我将无法提供更多帮助,但我认为您需要回去看看您的整个方法,以了解酒吧/子消息传递的异步性质

我有完全相同的问题,而Hardillb的答案确实有帮助。我只想使用loop_start()loop_stop()给出完整的示例。

import paho.mqtt.client as mqtt
import time
current_pose = -1
def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))
    print("The position will will be printed in [mm]")
    client.subscribe("send position",qos=1)
def on_message(client, userdata, msg):
    global current_pose
    current_pose = msg.payload.decode('utf8')
    print("Position = ",current_pose)
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("localhost", 1883, 60)
client.loop_start()
for i in range(0,10):
    time.sleep(1) # wait 1s
    print('current_pose = ', current_pose)
    
client.loop_stop() 
print('the position will not be updated anymore') 

在此示例中,该位置在十秒钟内每秒打印一次,此外,数据将由回调ON_MESSAGE打印。

而不是使用提到的全局变量方法,还可以使用Queue软件包。

import Queue
import paho.mqtt.client as mqtt
q = Queue.Queue() #initialises a first in first out queue
def mess(client, userdata, message):
    q.put(("{'" + str(message.payload) + "', " + str(message.topic) + "}"))
if not q.empty(): #check if the queue is empty
    msg = q.get()  #get the first message which was received and delete

这避免了使用时丢失任何传入的数据或任何被覆盖的访问数据的损坏。

最新更新