读取线程中的高频数据并使用 Tkinter 实时绘制图形



在过去的几周里,我一直在尝试制作一个应用程序,可以从OpenBCI Cyton(@250Hz(读取EEG数据并"实时"绘制图表。这里似乎效果更好的是线程。我应用了我在这里找到的提示 1 与 Tkinter 通信线程,但应用程序仍然不起作用(给我错误RecursionError: maximum recursion depth exceeded while calling a Python object(。也许我做错了什么,因为我尝试使用多个.py文件?请参阅下面我的代码的主要部分以及上下文中的更多注释:

###FILE main.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from AppWindow import *
window = AppWindow()
window.start()
###FILE AppWindow.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import tkinter as tk
from tkinter import ttk
from tkinter.scrolledtext import ScrolledText
import scroller as scrl
import logging
import requests
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import random
from pandas import DataFrame
import stream_lsl_eeg as leeg

#Definitions
H = 720
W = 1280
#Color palette>> https://www.color-hex.com/color-palette/92077
bg_color = "#c4ac93"
sc_color = "#bba58e"
tx_color = "#313843"
dt_color = "#987f62"
wn_color = "#6b553b"

class AppWindow:
#Other Functions
def plotGraph(self, x, y):
self.ax.clear()
self.ax.plot(x,y, color = tx_color)
plt.subplots_adjust(bottom=0.31, left=0.136, top=0.9, right=0.99)
plt.ylabel('Magnitude', fontsize = 9, color = tx_color)
plt.xlabel('Freq', fontsize = 9, color = tx_color)
self.figure.canvas.draw()
def __init__(self):
self.root = tk.Tk() #start of application
self.root.wm_title("Hybrid BCI - SSVEP and Eye Tracker")
#Other Graphical Elements
#Button that calls function
self.btn_ReceiveEEG = tk.Button(self.EEG_frame, text = "Receive EEG signal", bg = bg_color, fg = tx_color, state = tk.DISABLED, command = lambda: leeg.getEEGstream(self))
self.btn_ReceiveEEG.place(anchor = 'nw', relx = 0.52, rely = 0.5, width = 196, height = 40)
#Other Graphical Elements
def start(self):
self.root.mainloop() #end of application
### FILE stream_lsl_eeg.py
from pylsl import StreamInlet, resolve_stream
import tkinter as tk
import AppWindow as app
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import threading
import queue
import time

class myThread(threading.Thread):
def __init__(self, name, q, f):
threading.Thread.__init__(self)
self.name = name
self.q = q
self.f = f
def run(self):
print("Starting ", self.name)
pullSamples(self.q, self.f) #place where function is called

def getInlet(app): #this is triggered by another button and it's working fine
global inlet
app.logger.warn('Looking for an EEG strean...')
streams = resolve_stream('type', 'EEG')
inlet = StreamInlet(streams[0])
app.logger.warn('Connected')
app.btn_ReceiveEEG.config(state = tk.NORMAL)

def pullSamples(q):
i = 0
while i<1000:
sample, timestamp = inlet.pull_sample()
threadLock.acquire()    #thread locks to put info in the queue
q.put([sample,timestamp]) #data is put in the queue for other threads to access
threadLock.release()    #thread unlocks after info is in
i += 1
stopcollecting = 1
print("Exit flag on")
def plotSamples(app, kounter): #Outside Thread
if not stopcollecting: #testing if stream reception stopped
while dataqueue.qsize(  ):
try:
kounter += 1
sample, timestamp = dataqueue.get(0)
samples.append(sample[0]) #getting just channel 1 (0)
timestamps.append(timestamp)
show_samples = samples[-250:]
show_timestamps = timestamps[-250:]
app.plotGraph(show_timestamps,show_samples)
print(counter) #this was just a control to count if the right amount of samples was coming out of the queue
except dataqueue.Empty:
pass #still not implemented, but will return to the main application
app.root.after(60, plotSamples(flag,app,kounter)) #60 chosen because plot should update every 15 samples (15/250 = 0,06s)   

def getEEGstream(app): #function called by button
app.logger.warn('Starting thread...')
#
kounter = 0
start = time.perf_counter()
thread1.start()
##
plotSamples(flag, app, kounter)
##
thread1.join() #I don't know if I need this...
finish = time.perf_counter()
#
print(f'Sizes: Samples [{len(samples)}, {len(samples[0])}], {len(timestamps)} timestamps')
print(f'Sucessfully streamed in {round(finish-start,3)}s!')
###
threadLock = threading.Lock()
dataqueue = queue.Queue()
stopcollecting = 0
kounter = []
flag = queue.Queue() #secondary queue for flags not used at the moment
flag.put(0)
thread1 = myThread("Thread-1", dataqueue,flag)
samples,timestamps = [],[]
show_samples, show_timestamps = [],[]

正如我在这里发现的 2,函数不应该调用自己,但它基本上就是这里 1 所做的。另外,我不认为我像这里 3 那样多次打电话给root.mainloop()

执行后,python 给了我以下错误/输出:

Exception in Tkinter callback
Traceback (most recent call last):
File "C:UsersroboticsAppDataLocalContinuumanaconda3envspsychopylibtkinter__init__.py", line 1705, in __call__
return self.func(*args)
File "C:UsersroboticsDocumentsgitDocumentsSSVEP_EyeGaze_pyAppWindow.py", line 109, in <lambda>
self.btn_ReceiveEEG = tk.Button(self.EEG_frame, text = "Receive EEG signal", bg = bg_color, fg = tx_color, state = tk.DISABLED, command = lambda: leeg.getEEGstream(self))
File "C:UsersroboticsDocumentsgitDocumentsSSVEP_EyeGaze_pystream_lsl_eeg.py", line 118, in getEEGstream
plotSamples(flag, app, kounter)
File "C:UsersroboticsDocumentsgitDocumentsSSVEP_EyeGaze_pystream_lsl_eeg.py", line 104, in plotSamples
app.root.after(60, plotSamples(flag,app,kounter))
File "C:UsersroboticsDocumentsgitDocumentsSSVEP_EyeGaze_pystream_lsl_eeg.py", line 104, in plotSamples
app.root.after(60, plotSamples(flag,app,kounter))
File "C:UsersroboticsDocumentsgitDocumentsSSVEP_EyeGaze_pystream_lsl_eeg.py", line 104, in plotSamples
app.root.after(60, plotSamples(flag,app,kounter))
[Previous line repeated 986 more times]
File "C:UsersroboticsDocumentsgitDocumentsSSVEP_EyeGaze_pystream_lsl_eeg.py", line 92, in plotSamples
while dataqueue.qsize(  ): # if not dataqueue.empty():
File "C:UsersroboticsAppDataLocalContinuumanaconda3envspsychopylibqueue.py", line 87, in qsize
with self.mutex:
RecursionError: maximum recursion depth exceeded while calling a Python object
Exit flag on

显然,这意味着线程正在成功执行,但plotSamples()正在崩溃。

有什么建议吗??

after()(类似于按钮的command=bind()(需要函数的名称,没有()和参数 - 它被称为callback-after将其发送给mainloopmainloop稍后使用()来运行它。

您将函数与()一起使用

app.root.after(60, plotSamples(flag,app,kounter))

所以它立即运行它(它不会将其发送到mainloop(,并且此函数再次运行相同的函数,同时运行相同的函数,等等 - 所以你创建递归。

它的工作原理就像

result = plotSamples(flag,app,kounter) # run at once
app.root.after(60, result)

如果你必须将函数与参数一起使用,那么你可以做

app.root.after(60, plotSamples, flag, app, kounter)

最终您可以使用lambda创建没有参数的函数

app.root.after(60, lambda:plotSamples(flag,app,kounter) )

最新更新