使用 python 和 tkinter 实时绘制串行数据



我一直在努力寻找一种方法来绘制来自 Python GUI 的 arduino 传入数据的方法。我能够使用 Matplotlib 动画函数来读取 6 个不同的变量,并在一个子图上绘制其中 4 个变量 2 个子图,在另一个子图上绘制 2。这能够足够快地完成,以至于它可以实时绘制图形(每秒 20 个样本)。

我现在需要修改系统以同时读取 12 个不同的变量,其中 8 个是图形化的。 4 在一个子图上 4 在另一个子图上以每秒 20 个样本的相同速率。我一直无法让它工作,并且尝试了一些不同的事情并做了很多研究,但似乎无法弄清楚如何用我对 python 的有限知识来做到这一点。我对多处理或多线程不是很熟悉,但它们似乎是人们能够加快绘图过程的方式。我知道 matplotlib 动画函数本身是线程化的,所以我不确定线程对此有多大帮助,或者是否有办法在一个线程中读取并在另一个线程中更新图形。我正在以 arduino 支持 250000 的最高波特率运行。我还能够找到一个例子,有人能够在这篇文章中获得非常高的绘图,但无法修改为适合我使用:wxPython 的最佳实时绘图小部件是什么?

从Arduino接收的数据如下所示:

integer.integer.integer|integer.integer.integer|integer.integer.integer|integer.integer.integer

其中管道表示一个新的执行器(IM 发送的每个变量来自什么)

我对python相当陌生,如果这不是那么pythonic,很抱歉,但这里有两个例子: 这是一个使用动画功能的 GUI:

import Tkinter
import serial
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
from matplotlib import pyplot as plt
import matplotlib.animation as animation
from collections import deque
import random
class App:
def __init__(self, master):
self.arduinoData = serial.Serial('com5', 250000)#115200)
frame = Tkinter.Frame(master)
self.running = False
self.ani = None
self.start = Tkinter.LabelFrame(frame, text="Start", borderwidth=10, relief=Tkinter.GROOVE, padx=10, pady=10)
self.start.grid(row=0, column=0, padx=20, pady=20)
self.run = Tkinter.Button(self.start, text="RUN", bd=10, height=5, width=10, command=self.getData)
self.run.grid(row=0, column=0, padx=5, pady=5)
self.stop_frame = Tkinter.LabelFrame(frame, text="STOP", borderwidth=10, relief=Tkinter.GROOVE, padx=10, pady=10 )
self.stop_frame.grid(row=0, column=1, padx=20, pady=20)
self.stop = Tkinter.Button(self.stop_frame, text="STOP", bd=10, height=5, width=10, command=self.stopTest)
self.stop.grid(row=0, column=0, padx=5, pady=5)
self.fig = plt.Figure()
self.ax1 = self.fig.add_subplot(211)
self.line0, = self.ax1.plot([], [], lw=2)
self.line1, = self.ax1.plot([], [], lw=2)
self.line2, = self.ax1.plot([], [], lw=2)
self.line3, = self.ax1.plot([], [], lw=2)
self.ax2 = self.fig.add_subplot(212)
self.line4, = self.ax2.plot([], [], lw=2)
self.line5, = self.ax2.plot([], [], lw=2)
self.line6, = self.ax2.plot([], [], lw=2)
self.line7, = self.ax2.plot([], [], lw=2)
self.canvas = FigureCanvasTkAgg(self.fig,master=master)
self.canvas.show()
self.canvas.get_tk_widget().grid(row=0, column=4, padx=20, pady=20)
frame.grid(row=0, column=0, padx=20, pady=20)
def getData(self):
if self.ani is None:
self.k = 0
self.arduinoData.flushInput()
self.arduinoData.write("<L>")
return self.start()
else:
self.arduinoData.write("<L>")
self.arduinoData.flushInput()
self.ani.event_source.start()
self.running = not self.running
def stopTest(self):
self.arduinoData.write("<H>")
if self.running:
self.ani.event_source.stop()
self.running = not self.running
def resetTest(self):
self.k = 0
self.xdata = []
self.pressure1 = []
self.displacement1 = []
self.cycle1 = []
self.pressure2 = []
self.displacement2 = []
self.cycle2 = []
self.pressure3 = []
self.displacement3 = []
self.cycle3 = []
self.pressure4 = []
self.displacement4 = []
self.cycle4 = []
self.line1.set_data(self.xdata, self.ydata1)
self.line2.set_data(self.xdata, self.ydata2)
self.ax1.set_ylim(0,1)
self.ax1.set_xlim(0,1)
self.ax2.set_ylim(0,1)
self.ax2.set_xlim(0,1)
def start(self):
self.xdata = []
self.pressure1 = []
self.displacement1 = []
self.cycle1 = []
self.pressure2 = []
self.displacement2 = []
self.cycle2 = []
self.pressure3 = []
self.displacement3 = []
self.cycle3 = []
self.pressure4 = []
self.displacement4 = []
self.cycle4 = []
self.k = 0
self.arduinoData.flushInput()
self.ani = animation.FuncAnimation(
self.fig,
self.update_graph,
interval=1,
repeat=True)
self.arduinoData.write("<L>")
self.running = True
self.ani._start()
def update_graph(self, i):
self.xdata.append(self.k)
while (self.arduinoData.inWaiting()==0):
pass
x = self.arduinoData.readline()
strip_data = x.strip()
split_data = x.split("|")
actuator1 = split_data[0].split(".")
actuator2 = split_data[1].split(".")
actuator3 = split_data[2].split(".")
actuator4 = split_data[3].split(".")
self.pressure1.append(int(actuator1[0]))
self.displacement1.append(int(actuator1[1]))
self.cycle1 = int(actuator1[2])
self.pressure2.append(int(actuator2[0]))
self.displacement2.append(int(actuator2[1]))
self.cycle2 = int(actuator2[2])
self.pressure3.append(int(actuator3[0]))
self.displacement3.append(int(actuator3[1]))
self.cycle3 = int(actuator3[2])
self.pressure4.append(int(actuator4[0]))
self.displacement4.append(int(actuator4[1]))
self.cycle4 = int(actuator4[2])
self.line0.set_data(self.xdata, self.pressure1)
self.line1.set_data(self.xdata, self.pressure2)
self.line2.set_data(self.xdata, self.pressure3)
self.line3.set_data(self.xdata, self.pressure4)
self.line4.set_data(self.xdata, self.displacement1)
self.line5.set_data(self.xdata, self.displacement2)
self.line6.set_data(self.xdata, self.displacement3)
self.line7.set_data(self.xdata, self.displacement4)
if self.k < 49:
self.ax1.set_ylim(min(self.pressure1)-1, max(self.pressure3) + 1)
self.ax1.set_xlim(0, self.k+1)
self.ax2.set_ylim(min(self.displacement1)-1, max(self.displacement3) + 1)
self.ax2.set_xlim(0, self.k+1)
elif self.k >= 49:
self.ax1.set_ylim(min(self.pressure1[self.k-49:self.k])-1, max(self.pressure3[self.k-49:self.k]) + 1)
self.ax1.set_xlim(self.xdata[self.k-49], self.xdata[self.k-1])
self.ax2.set_ylim(min(self.displacement1[self.k-49:self.k])-1, max(self.displacement3[self.k-49:self.k]) + 1)
self.ax2.set_xlim(self.xdata[self.k-49], self.xdata[self.k-1])
self.k += 1


root = Tkinter.Tk()
app = App(root)
root.mainloop()

这是一个打印到显示器的 GUI:

import Tkinter
import serial
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
from matplotlib import pyplot as plt
import matplotlib.animation as animation
import time
class App:
def __init__(self, master):
self.arduinoData = serial.Serial('com5', 250000, timeout=0)
frame = Tkinter.Frame(master)
self.go = 0
self.start = Tkinter.LabelFrame(frame, text="Start", borderwidth=10, relief=Tkinter.GROOVE, padx=10, pady=10)
self.start.grid(row=0, column=0, padx=20, pady=20)
self.run = Tkinter.Button(self.start, text="RUN", bd=10, height=5, width=10, command=self.getData)
self.run.grid(row=0, column=0, padx=5, pady=5)
self.stop_frame = Tkinter.LabelFrame(frame, text="STOP", borderwidth=10, relief=Tkinter.GROOVE, padx=10, pady=10 )
self.stop_frame.grid(row=0, column=1, padx=20, pady=20)
self.stop = Tkinter.Button(self.stop_frame, text="STOP", bd=10, height=5, width=10, command=self.stopTest)
self.stop.grid(row=0, column=0, padx=5, pady=5)
self.fig = plt.Figure()
self.ax1 = self.fig.add_subplot(211)
self.line0, = self.ax1.plot([], [], lw=2)
self.line1, = self.ax1.plot([], [], lw=2)
self.line2, = self.ax1.plot([], [], lw=2)
self.line3, = self.ax1.plot([], [], lw=2)
self.ax2 = self.fig.add_subplot(212)
self.line4, = self.ax2.plot([], [], lw=2)
self.line5, = self.ax2.plot([], [], lw=2)
self.line6, = self.ax2.plot([], [], lw=2)
self.line7, = self.ax2.plot([], [], lw=2)
self.canvas = FigureCanvasTkAgg(self.fig,master=master)
self.canvas.show()
self.canvas.get_tk_widget().grid(row=0, column=4, padx=20, pady=20)
frame.grid(row=0, column=0, padx=20, pady=20)
def getData(self):
self.k = 0
self.xdata = []
self.pressure1 = []
self.displacement1 = []
self.cycle1 = []
self.pressure2 = []
self.displacement2 = []
self.cycle2 = []
self.pressure3 = []
self.displacement3 = []
self.cycle3 = []
self.pressure4 = []
self.displacement4 = []
self.cycle4 = []
self.arduinoData.flushInput()
self.go = 1
self.readData()
def readData(self):
if self.go == 1:
self.xdata.append(self.k)
while (self.arduinoData.inWaiting()==0):
pass
x = self.arduinoData.readline()
strip_data = x.strip()
split_data = x.split("|")
actuator1 = split_data[0].split(".")
actuator2 = split_data[1].split(".")
actuator3 = split_data[2].split(".")
actuator4 = split_data[3].split(".")
self.pressure1.append(int(actuator1[0]))
self.displacement1.append(int(actuator1[1]))
self.cycle1 = int(actuator1[2])
self.pressure2.append(int(actuator2[0]))
self.displacement2.append(int(actuator2[1]))
self.cycle2 = int(actuator2[2])
self.pressure3.append(int(actuator3[0]))
self.displacement3.append(int(actuator3[1]))
self.cycle3 = int(actuator3[2])
self.pressure4.append(int(actuator4[0]))
self.displacement4.append(int(actuator4[1]))
self.cycle4 = int(actuator4[2])
self.printData()
root.after(0, self.readData)

def printData(self):
print str(self.pressure1[self.k-1]) + " " + 
str(self.displacement1[self.k-1]) + " " + str(self.cycle1) + " " + 
str(self.pressure2[self.k-1]) + " " + str(self.displacement2[self.k-
1]) + " " + str(self.cycle2) + " " + str(self.pressure3[self.k-1]) + 
" " + str(self.displacement3[self.k-1]) + " " + str(self.cycle3) + " 
" + str(self.pressure4[self.k-1]) + " " + 
str(self.displacement4[self.k-1]) + " " + str(self.cycle4)
def stopTest(self):
self.arduinoData.write("<H>")
self.go = 0

def resetTest(self):
self.k = 0
self.xdata = []
self.pressure1 = []
self.displacement1 = []
self.cycle1 = []
self.pressure2 = []
self.displacement2 = []
self.cycle2 = []
self.pressure3 = []
self.displacement3 = []
self.cycle3 = []
self.pressure4 = []
self.displacement4 = []
self.cycle4 = []
self.line1.set_data(self.xdata, self.ydata1)
self.line2.set_data(self.xdata, self.ydata2)
self.ax1.set_ylim(0,1)
self.ax1.set_xlim(0,1)
self.ax2.set_ylim(0,1)
self.ax2.set_xlim(0,1)
def start(self):
self.xdata = []
self.pressure1 = []
self.displacement1 = []
self.cycle1 = []
self.pressure2 = []
self.displacement2 = []
self.cycle2 = []
self.pressure3 = []
self.displacement3 = []
self.cycle3 = []
self.pressure4 = []
self.displacement4 = []
self.cycle4 = []
self.k = 0
self.arduinoData.write("<L>")
root = Tkinter.Tk()
app = App(root)
root.mainloop()

下面是一个示例Arduino代码:

int analog0 = 0;
int analog1 = 1;
int analog2 = 2;
int sensor0;
int sensor1;
int sensor2;
String pot0;
String pot1;
String Force;
int pot0holder;
int pot1holder;
String Forceholder;
unsigned long i = 0;
String Is;
int val = 0;
boolean Sensordata = false;
int cycles;
const byte numChars = 32;
char receivedChars[numChars];
boolean newData = false;
unsigned long CurrentMillis = 0;
unsigned long PrintMillis = 0;
int PrintValMillis = 50;
unsigned long SensorMillis = 0;
int SensorValMillis = 0;
void setup() {
// put your setup code here, to run once:
Serial.begin(250000);
}
void loop()
{
CurrentMillis = millis();
recvWithStartEndMarkers();
commands();
sensordata();
}
void sensordata()
{
if (CurrentMillis - SensorMillis >= SensorValMillis)
{
sensor0 = analogRead(analog0);
pot0holder = sensor0;
sensor1 = analogRead(analog1);
pot1holder = sensor1;
i += 1;
String potcolumn = String(pot0holder) + "." + String(pot1holder) + "." +  String(i) + "|" + String(int(pot0holder)+30) + "." + String(int(pot1holder)+30) + "." +  String(i) + "|" + String(int(pot0holder)+60) + "." + String(int(pot1holder)+60) + "." +  String(i) + "|" + String(int(pot0holder)+90) + "." + String(int(pot1holder)+90) + "." +  String(i);
Serial.println(potcolumn);
SensorMillis += SensorValMillis;
}
}
void recvWithStartEndMarkers()
{
static boolean recvInProgress = false; //creates variable visible to only one function with boolean
static byte ndx = 0;
char startMarker = '<'; //sets begin condition
char endMarker = '>'; //sets end condition
char rc; //sets variable type to char
while (Serial.available() > 0 && newData == false) {
rc = Serial.read(); //sets rc equal to serial value
if (recvInProgress == true) {
if (rc != endMarker) {
receivedChars[ndx] = rc;
ndx++;
if (ndx >= numChars) {
ndx = numChars - 1;
}
}
else {
receivedChars[ndx] = ''; // terminate the string
recvInProgress = false;
ndx = 0;
newData = true;
}
}
else if (rc == startMarker) {
recvInProgress = true;
}
}
}
void commands()
{
if (newData == true)
{
if (receivedChars[0] == 'T')
{
PrintValMillis = atoi(&receivedChars[1]); //atoi -> Converting strings to integer
}
else if (receivedChars[0] == 'S')
{
cycles = atoi(&receivedChars[1]);
i = 0;
}
else if (receivedChars[0] == 'L')
{
val = atoi(&receivedChars[1]);
i = 0;
}
}
newData = false;
}

提前感谢任何人的任何帮助或建议。

所以你的阅读过程需要大部分时间。我会将读数放在一个单独的任务中,并在主(绘图)过程中对数据进行评估/拆分。不幸的是,我不是 tkinter 用户,所以我在没有任何特殊 gui 框架的情况下编写了这篇文章。但我认为你可以根据自己的需求进行调整。

那看起来像这样:

import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import multiprocessing as mp
import time

# global variables
fig = plt.figure(1)
# first sub-plot
ax1 = fig.add_subplot(211)
line1, = ax1.plot([], [], lw=2)
ax1.grid()
xdata1, ydata1 = [], []
# second sub-plot
ax2 = fig.add_subplot(212)
line2, = ax2.plot([], [], lw=2)
ax2.grid()
xdata2, ydata2 = [], []
# the multiprocessing queue
q = mp.Queue()
# data generator in separate process
# here would be your arduino data reader
def dataGen(output):
for x in range(50):
output.put((x, np.sin(x)))
# update first subplot
def update1(data):
# update the data
t, y = data
xdata1.append(t)
ydata1.append(y)
xmin, xmax = ax1.get_xlim()
ymin, ymax = ax1.get_ylim()
if t >= xmax:
ax1.set_xlim(xmin, 2*xmax)
if y >= ymax:
ax1.set_ylim(ymin, 2*ymax)
if y <= ymin:
ax1.set_ylim(2*ymin, ymax)
line1.set_data(xdata1, ydata1)
return line1,
# update second subplot
def update2(data):
# update the data
t, y = data
xdata2.append(t)
ydata2.append(y)
xmin, xmax = ax2.get_xlim()
ymin, ymax = ax2.get_ylim()
if t >= xmax:
ax2.set_xlim(xmin, 2*xmax)
if y >= ymax:
ax2.set_ylim(ymin, 2*ymax)
if y <= ymin:
ax2.set_ylim(2*ymin, ymax) 
line2.set_data(xdata2, ydata2)
return line2,
# called at each drawing frame
def run(data):
# get data from queue, which is filled in separate process, blocks until
# data is available
data = q.get(block=True, timeout=.5)
# put here your variable separation
data1 = (2*data[0], 3*data[1])
data2 = (data[0], data[1])
#provide the data to the plots
a = update1(data1)
b = update2(data2)
fig.canvas.draw()
return a+b
if __name__ == "__main__":
# count of reader processes
n_proc = 1
# setup workers
pool = [mp.Process(target=dataGen, args=(q,)) for x in range(n_proc)]
for p in pool:
p.daemon = True
p.start()
# wait a few sec for the process to become alive
time.sleep(3)
# start your drawing
ani = animation.FuncAnimation(fig, run, frames=60, blit=True, interval=10,
repeat=False)
plt.show()
print('done')

我的问题和你的问题非常相似。 我需要每 80 毫秒从 profi-bus 网络获取数据,并且我希望在采样时绘制数据。

我使用多处理来解决问题。管道用于两个进程之间的通信。 当绘图仪从收集器获取数据并且要打印某些内容时,绘图仪将向收集器发送一条消息。然后收集器将停止发送数据并将数据放入列表中。 当绘图仪完成其工作时,它会告诉收集器"现在您可以发送数据",然后收集器发送数据并清除列表。

import time
import numpy as np
from matplotlib import pyplot as plt
import multiprocessing
from multiprocessing import Process, Pipe
from random import random
class DataGennerator(object):
"""docstring for DataGennerator"""
def __init__(self, data_pipe):
super(DataGennerator, self).__init__()
print('Data Gennerator Init...')
self.data_buffer = []
self.t = 0
self.start_time = 0
self.data_pipe = data_pipe
self.plot_inprogess = False
self.data_ready = False

def run(self):
self.start_time = time.time()
for i in range(0, 400):
self.loop_cycle()
print('Total Time:', time.time()-self.start_time)
print('Run completion......')
def loop_cycle(self):
self.t = time.time()-self.start_time
new_data = [time.time()-self.start_time, np.sin(self.t), np.cos(2*self.t), np.cos(self.t*4), random()]
self.send_data(new_data)
time.sleep(0.08)
def send_data(self, new_data):
if self.plot_inprogess or not self.data_ready:
self.data_buffer.append(new_data)
self.data_ready = True
# Wait 1ms to read plotter's msg
if self.data_pipe.poll(0.0001):
self.plot_inprogess = self.data_pipe.recv()
else:
self.data_pipe.send(self.data_buffer)
self.data_buffer = []
self.data_ready = False
# Function to collect data by using DataGennerator
def get_data(data_pipe):
dg = DataGennerator(data_pipe)
dg.run()
data_pipe.send('EXIT')
print('>>> Finished')

# use plotter_pipe to communicate with data collector
# and when get data from the collector, updata the figure
def updata_plot(plotter_pipe, plot_inprogess=True):
plot_inprogess = True
fig, ax = plt.subplots(nrows=4, ncols=1, figsize=(6,8), sharex=True)
fig.set_tight_layout(True)
styles = ['rs-', 'gs-', 'bs-', 'ro-', 'go-', 'bo-']*10
lines = []
for index, name in enumerate(['sin(t)', 'cos(t)', 'cos(2t)', 'random']):
line, = ax[index].plot([],[], styles[index],label=name, markersize=4, markerfacecolor='w')
ax[index].set_ylabel(name, color=styles[index][0], fontweight='bold')
lines.append(line)
ax[-1].set_xlabel('Time /s')
fig.align_ylabels(ax)
plt.ion()
plt.show(block=False)
plt.draw()
# Read the 1st data package and convert it to Numpy Array
data_array = np.array(plotter_pipe.recv())
while True:
try:
# Read data_buffer sent by Data Collector
data_buffer = plotter_pipe.recv()   #[ [data1], [data2]...]
# If the DataCollector says EXIT, then break the while loop
if data_buffer == 'EXIT': break
# Raise a flag to indicate that Plot is in progress
plotter_pipe.send(True)
# Append data_buffer to Data Array
data_array = np.append(data_array, np.array(data_buffer), axis=0)
for i in range(0, 4):
lines[i].set_xdata(data_array[:,0])
lines[i].set_ydata(data_array[:,i+1])
ax[i].relim()
ax[i].autoscale_view()
fig.canvas.draw()
plt.pause(0.001)
# Tell data collector that Plot has been finished
plotter_pipe.send(False)
except Exception as e:
raise e
finally:
pass
print('>>> Stop receiving data')
data_content = 'n'.join([', '.join(map(str,data_line)) for data_line in data_array])
with open('data.txt', 'w', encoding='UTF-8') as f:
f.write('time, xx, yy, zz, bbn')
f.writelines(data_content)
plt.show(block=True)
if __name__ == '__main__':
plot_inprogess = True
data_pipe, plotter_pipe = multiprocessing.Pipe(True)
P1 = Process(target=get_data   , args=(data_pipe,))
P2 = Process(target=updata_plot, args=(plotter_pipe,))
P1.start()
P2.start()
P1.join()
P2.join()

相关内容

  • 没有找到相关文章

最新更新