实时绘图传感器数据Python Raspberry pi



我有一个BNO055传感器通过I2C协议连接到我的树莓派。该传感器可以输出9个测量值,包括所有3个轴上的加速度。(x,y,z(

我想要一个x加速度作为时间函数的实时图。

我正试图在我的Raspberry Pi上为一个项目运行此代码。

def animate(i,t_updt,f,sp1,sensor1):
initialtime = time.perf_counter()
lastTime = initialtime;
times=np.array([])
x=np.array([])
while(1): 
currentTime = time.perf_counter()
if  (currentTime - lastTime >= 1/f):
lastTime = currentTime
times = np.append(times,lastTime - initialtime)
sens1Acc = sensor1.acceleration     
x = np.append(x,sens1Acc[0])       
if len(times) == t_updt*f:
break
#Clear and draw new values   
sp1.clear()
sp1.plot(times,x)
sp1.set_title('X - accel - sensor1')
sp1.set_ylim([-10,10])
import adafruit_bno055
import time
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import datetime as dt
from busio import I2C
from board import SDA, SCL
from time import sleep
##Creating sensor1 object
sensor1 = adafruit_bno055.BNO055_I2C(i2c,40)
sensor1.mode = adafruit_bno055.NDOF_MODE
sleep(2) # wait for system to stabilize
f=50 #Sampling freq
fig = plt.figure()
sp1 = fig.add_subplot(1,1,1)       
ani = animation.FuncAnimation(fig, animate, fargs=(5,f,sp1,sensor1),interval=1000)
plt.show()

正如你所看到的,我已经实现了每1秒调用一次的animate函数。在这个函数的运行过程中,有一个while循环运行了5秒钟(因为传递了fargs[0]参数(,在这个循环中,我们以50赫兹的速率采样5秒钟的数据。

我唯一的问题是:图表不会更新。我已经打印出来,看到x值确实得到了更新,但绘图本身并没有更新。。。

有什么想法吗?

我在Pi上使用传感器来测量温度和湿度,在读取点时捕获时间/历元总体进近

  1. 包装类,用于读取传感器并返回包含时间/历元的传感器数据dict
  2. Pi上的控制器,使用包装器类并将数据发布到RESTful API(flask(
  3. 活动实时更新网页(使用AJAX和ChartJS

PiSensor.py

import os, platform
import time, datetime, random
try:
import Adafruit_DHT
except ImportError:
plat = "{sys}.{mc}".format(sys=platform.system(), mc=platform.machine())
if (plat == "Linux.armv7l"):
raise
else:
print(f"Adafruit_DHT not supported on this platform {plat}")
from typing import (Tuple, Optional)
try:
DHT_SENSOR = Adafruit_DHT.DHT11
except NameError:
pass
DHT_PIN = 17

#----------------------------------------------------------------
#   Note:
#       ds18b20's data pin must be connected to pin7.
#----------------------------------------------------------------
class PiSensor:
__hostname = ""
__latitude = 1.3740981993768249
__longitude = 103.85034994827384
def __init__(self) -> None:
self.__hostname = os.uname()[1].split(".")[0]
def set_gps(self, latitude:float, longitude:float) -> None:
self.__latitude = latitude
self.__longitude = longitude
def get_gps(self) -> Tuple[float, float]:
return self.__longitude, self.__latitude
def sensorData(self, id:str, temp:float, humidity:Optional[float]=None) -> list:
data = [{"id":id, "time":datetime.datetime.now().strftime("%I:%M:%S%p"),
"value":temp, "hostname":self.__hostname, "epoch":int(time.time()*1000),
"latitude":self.__latitude, "longitude":self.__longitude, "type":"temperature"}]
if humidity is not None:
data2 = data[0].copy()
data2["id"] = data2["id"] + "_humidity" # type: ignore
data2["value"] = humidity
data2["type"] = "humidity"
data.append(data2)
return data
# Reads temperature from sensor and prints to stdout
# id is the id of the sensor
def readSensor(self, id:str) -> list:
with open("/sys/bus/w1/devices/"+id+"/w1_slave") as tfile:
text = tfile.read()
secondline = text.split("n")[1]
temperaturedata = secondline.split(" ")[9]
temperature = float(temperaturedata[2:])
temperature = temperature / 1000
return self.sensorData(id, temperature)

# Reads temperature from all sensors found in /sys/bus/w1/devices/
# starting with "28-...
def readSensors(self) -> list:
count = 0
data:list = []
for file in os.listdir("/sys/bus/w1/devices/"):
if (file.startswith("28-")):
data += self.readSensor(file)
count+=1
if (count == 0):
print("No sensor found! Check connection")

humidity, temp = Adafruit_DHT.read_retry(DHT_SENSOR, DHT_PIN)
if humidity is not None and temp is not None:
data += self.sensorData("DHT11", temp, humidity)
else:
print("oh no DHT11 not working")

return data

控制器(仅示例代码(

def read_sensor(alive, mode:int, ser:SerialInterface, config) -> None:
sess = requests.Session()
sensor = PiSensor()
logenv.alog().info("thread started... sleeping {s}".format(s=config["sensorsleep"]))
data += sensor.readSensors()
try:
resp = sess.post("{u}/sensor_data".format(u=u), json=data, verify=False)
except requests.exceptions.ConnectionError:
# maybe one of servers has gone down....
logenv.alog().warning("no data sent {u} not alive".format(u=u))
finally:
sess.close()
logenv.alog().info("shutting down thread")

API烧瓶

@app.route('/sensor_data', methods=['POST'])
def sensor_data() -> str:
c = mycache()
dfsensor = c.get("dfsensor")
newsensor = json_normalize(request.get_json())
newsensor[["x","y"]] = newsensor[["epoch", "value"]]
newsensor["xy"] = newsensor[['x', 'y']].agg(pd.Series.to_dict, axis=1)
newsensor["amin"] = newsensor["value"]
newsensor["amax"] = newsensor["value"]
newsensor = newsensor.drop(columns=["x","y"])
# add new data from serial interface to start of list (append old data to new data).
# default time as now to new data
dfsensor = newsensor.append(dfsensor, sort=False)
# keep size down - only last 500 observations
c.set("dfsensor", dfsensor[:500])
del dfsensor
return jsonify(result={"status":"ok"})
# used in conjnction with AJAX functionality in home.html template
@app.route('/serial_json', methods=['GET'])
def serial_json() -> Response:
type = request.args.get("type", "temperature")
freq = request.args.get("freq", "")
graphtype = request.args.get("graphtype")
df = mycache().get("dfsensor{f}".format(f=freq))
df = df[df["type"]==type] # type: ignore
# resample data down to proportion of data set and defined number of points
if "sensorcookie" in request.cookies:
ck = json.loads(urllib.parse.unquote(request.cookies.get("sensorcookie")))
if not "pointsslider" in ck:
ck = {"pointsslider": 100, "pointstoplot":20, **ck}
else:
ck = {"pointsslider": 100, "pointstoplot":20}
df, min_epoch, max_epoch = sensor_ts.resample_for_plot(df, int(ck["pointsslider"]), int(ck["pointstoplot"]))
extdata = restdata_source.tickdata(df=df, required="graph", graphtype=graphtype)
del df
resp = make_response(jsonify(result={"data":extdata}))
ck = {"min_epoch":int(min_epoch), "max_epoch":int(max_epoch), **ck}
resp.set_cookie("sensorcookie", json.dumps(ck))
return resp

最新更新