MQTT & Postgresql DB:使用 python 将 mqtt 消息插入到 postgresql 表中的列中



我是PostgreSQL的新手,我想使用pysycopg2将mqtt消息插入PostgreSQL数据库。不幸的是,它没有按预期工作。我认为这是一个简单的错误,但无法弄清楚错误的确切原因。首先,我使用python脚本[1]在mosquitto broker中发布了mqtt消息,然后从另一个脚本[2]订阅并尝试存储到postgresql中。[3]中显示了相应的错误消息。

以下是我的Publisher脚本,用于将伪造的mqtt-json数据发布到mosquittobroker:

#!/usr/bin/python
import paho.mqtt.client as mqtt
import numpy as np
import time
broker_address = "localhost"
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
client = mqtt.Client()
client.on_connect = on_connect
client.connect(broker_address,1883,60)
client.loop_start()
while True:
time.sleep(0.05)
degrees = np.random.random_sample()
toa = np.random.random_sample()
humidity = np.random.random_sample()
json =  ('''[{"time": "2020-04-01 21:00:00",  "device_addr": "buizg8b8",  "FCntUp": "7281",   "CF":"867900000",   "BW":"125000",  "SF":"10",  "RSSI":"-121","SNR": "-14", "sec":"123564574567", "nsec":   "245244546", "offset":"4184",   "Uncertainty": "7816",  "Offset Uncertainty":"201.17"   ,"device EUI":"ruzfv276gz2v23g", "Id":"0" ,     "Latitude": "30.347834" , "Longitude":"20.34763",   " Altitude":"500","MIC":"87hiub87"}]''')
locpk= '{"mote":"niwuinnwe","frame_cnt":2000,"hz":902700000,"bandwidth":125000,"sf":7,"antenna":0,"sec":1235346545645,"nsec":245245245,"rssi_db":-93,"snr_db":10.0,"o_hz":14994,"u_nsec":5.6,"u_hz":1.16,"lat":51.120052,"lon":-114.041752,"alt":1061,"device_id":"3f3g3g354bv42rr4rg"}'
locpk= str(locpk)
json= str(json)
client.publish("device14/geo", locpk, 1, 1)
client.publish("device14/geo", json, 1, 1)

以下是我的订阅者脚本,用于订阅已发布的消息并插入PostgreSQL:

#!/usr/bin/python
import psycopg2
from psycopg2 import connect, Error
from config import config
import paho.mqtt.client as mqtt
import datetime
import time
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
client.subscribe("device14/geo",0)
def on_message(client, userdata, msg):
Date = datetime.datetime.utcnow()
message= msg.payload.decode()
try:
#print the JSON Message with Data and Topic
print(str(Date) + ": " + msg.topic + " " + str(message))
#concatenate the SQL string
sql_string = "INSERT INTO table_name(column_name)nVALUES %s" % (message)
#execute the INSERT statement
cur = conn.cursor()
cur.execute(sql_string)
#commit the changes to the database
conn.commit()
print("Finished writing to PostgreSQL")
except (Exception, Error) as err:
print("npsycopg2 connect error:", err)
#print("Could not insert " + message + " into Postgresql DB")
#Set up a client for Postgresql DB
try:
#read connection parameters
params = config()
#connect to the PostgreSQL server
print('Connecting to the PostgreSQL database...')
conn = psycopg2.connect(**params)
#create a cursor
cur = conn.cursor()
#execute a statement
print('PostgreSQL database version:')
cur.execute('SELECT version()')
cur.execute(sql)
#display the PostgreSQL database server version
db_version = cur.fetchone()
print(db_version)
except (Exception, psycopg2.DatabaseError) as error:
print(error)
#Initialize the MQTT client that should connect to the Mosquitto broker
client = mqtt.Client()
#set last will message
client.will_set('Postgresql_Paho-Client/lastwill','Last will message', 1, True)
client.on_connect = on_connect
client.on_message = on_message
connOK=False
while(connOK == False):
try:
client.connect("localhost", 1883, 60)
connOK = True
except:
connOK = False
time.sleep(2)
#Blocking loop to the Mosquitto broker
client.loop_forever()

错误:

/home/osboxes/postgresql/bin/python /home/osboxes/PycharmProjects/postgresql/geo_store.py

Connecting to the PostgreSQL database...
PostgreSQL database version:
no results to fetch
Connected with result code 0
Received a message on topic: device14/geo

2020-04-10 15:18:00.336002: device14/geo [{"time": "2020-04-01 21:00:00",   "device_addr": "buizg8b8",  "FCntUp": "7281",   "CF":"867900000",   "BW":"125000",  "SF":"10",  "RSSI":"-121","SNR": "-14", "sec":"123564574567", "nsec":   "245244546", "offset":"4184",   "Uncertainty": "7816",  "Offset Uncertainty":"201.17"   ,"device EUI":"ruzfv276gz2v23g", "Id":"0" ,     "Latitude": "30.347834" , "Longitude":"20.34763",   " Altitude":"500","MIC":"87hiub87"}]

psycopg2 connect error: syntax error at or near "["
LINE 2: VALUES [{"time": "2020-04-05 21:00:00", "device_addr": "buizg...
^**

Received a message on topic: device14/geo
2020-04-10 15:18:00.366786: device14/geo {"mote":"niwuinnwe","frame_cnt":2000,"hz":902700000,"bandwidth":125000,"sf":7,"antenna":0,"sec":1235346545645,"nsec":245245245,"rssi_db":-93,"snr_db":10.0,"o_hz":14994,"u_nsec":5.6,"u_hz":1.16,"lat":51.120052,"lon":-114.041752,"alt":1061,"device_id":"3f3g3g354bv42rr4rg"}
psycopg2 connect error: syntax error at or near "{"
LINE 2: VALUES {"mote":"niwuinnwe","frame_cnt":2000,"hz":902700000...

期待您的评论。如有任何帮助,我们将不胜感激。

PS:我也尝试过更改发布消息结构(即locpk、json(,但没有帮助。如果您对发布的消息结构有任何建议,请告诉我。我会试试的。

我没有看到table_name的表结构,但如果它只有一列(column_name(,并且您想在其中存储JSON文档,则需要在PostgreSQL中将其定义为jsonb
在这样的列中插入数据很容易:

from psycopg2.extras import Json
...
query = "INSERT INTO table_name(column_name) VALUES (%s)"
data = (Json(message),)
cur.execute(query, data)
conn.commit()
...

但是,对整个消息使用单列并不是一个好的设计选择
time, device_addr, latitude, longitude, altitude等公共键创建列(我只是根据提供的数据猜测(
将不太重要(可能丢失(的键存储在一个单独的jsonb列中(例如称为data(。

最新更新