我使用paho mqtt模块发布到一个主题,并从另一个程序订阅它。我正在发布10000条消息,发布者能够在大约2秒内发送。在订阅者中,我正在获取消息并将值写入influxdb。在大约2000条记录之后,MQTT订阅者停止并等待time.sleep()完成。
import paho.mqtt.client as mqtt #import the client1
import time
from datetime import datetime
from influxdb_client import InfluxDBClient, Point, Dialect, WriteOptions
from influxdb_client.client.write_api import SYNCHRONOUS
org = "my-ord"
bucket = "Bucket1"
token = "my-token"
client = InfluxDBClient(url="http://localhost:8086", token=token, org=org)
write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()
#Function to write the record to influx
def update_db(point):
write_api.write(bucket=bucket, record=point)
print("Point written")
msg_count = 0
#On message callback function
def on_message(client, userdata, message):
global msg_count
msg_count+=1
print("message received " ,str(message.payload))
_point1 = Point("mqtt2").tag("message","message").field("datapt",str(message.payload))
update_db(_point1)
print(msg_count)
#This is the Subscriber
ip = "localhost"
client = mqtt.Client("P2")
client.on_message=on_message
client.connect(ip)
client.loop_start()
client.subscribe("influx")
time.sleep(180)
client.loop_stop()
print(msg_count)
发布者在一秒钟内发布10000条消息。如果没有写入命令,代码将一直运行到最后。当我包括写入时,订阅者在大约2000条消息后停止。我应该做些什么改变才能使它起作用?
内流写入将是一个阻塞调用,您不应该在MQTT客户机回调中进行阻塞调用,因为它们会阻止客户机处理新的传入消息。
如果你需要阻塞IO作为传入消息的结果,你应该把这项工作交给一个单独的线程。