计时器不能从另一个线程停止(nidaqmx-python 和回调的简短示例)



我在这个论坛上看到了关于这个主题的其他问题,但没有一个帮助我理解如何处理这个问题。在我看来,它们中的大多数也是关于相当复杂和冗长的代码。我相信我正在做一些相当简单的事情/想做一些相当简单的事情。希望有人能帮忙!下面是广泛的解释,然后是我当前的代码。

注意:请不要删除此问题。我考虑了很多,并仔细浏览了相关的线程,但无济于事。我也认为发布这个是有意义的,因为它部分与一个更通用的问题有关:如何在后台运行回调的同时实时绘制(见最后的摘要),这可以总结为我的总体目标。

设置和目标:NI 采集 模 块 (这 很重要) NI cDAQ9178, 通过nidaqmx-python接口, 该 包 由 NI 维护, 并 包含 此处 的 文档。在那里输入一些模拟信号,目标是以一定的采样率(大约 1000 Hz)连续采集它(直到我决定停止采集),同时实时绘制信号。绘图几乎不需要经常刷新(10Hz刷新率甚至可以)。我在conda虚拟环境中使用Windows 10和Python 3.7,编辑是在PyCharm中完成的。理想情况下,事情应该在 PyCharm 和任何终端中都有效。

情况:nidaqmx-python提供了高级函数,允许人们注册回调(根据自己的意愿定义),每次一定数量的样本(在我的例子中为 100,但这并不严格)填满 PC 缓冲区时都会调用这些回调。这个想法是,下面定义的回调在该点读取缓冲区并执行某些操作(在我的例子中,一些低通过滤,为了简洁起见,我将其取出,一些存储到全局变量data中,也许绘制 - 见下文)。

问题:我一直在玩弄将实时数据包含在回调中,但是对于 matplotlib,这是一场噩梦,因为回调使用主线程以外的线程,而 matplotlib 不喜欢从主线程之外的任何地方调用。我已经用谷歌搜索了其他针对实时绘图优化的库(而且,我在想,希望线程安全),但这并不容易:我无法让 vispy 工作,我甚至无法让 pyqtgraph 安装,只是给你一些例子。然后我在互联网上看到了几篇帖子,人们实际上使用 matplotlib 管理相当不错的实时动画,尽管它是在开发时考虑发布而不是这些应用程序;所以我想让我们试一试。

的看法:由于我无法让 matplotlib 从回调内部完成工作,因此我做了以下操作(这是您在下面看到的代码):在回调之后和任务以task.start()启动后(特定于nidaqmx-python),我只是创建一个绘制全局变量bufferwhile循环。我认为这是一个很好的技巧:看,buffer每 0.1 秒左右通过回调更新(没关系)(没关系),另一方面,while循环一遍又一遍地绘制buffer变量,每次在绘制之前擦除,有效地产生实时绘图。

注意:我完全知道绘图部分并没有达到应有的水平(我可能应该使用 matplotlib 和subplots的 ax API,更不用说动画了),但我目前不在乎。我稍后会处理它并对其进行优化以使其更有效率。

我想要的:这实际上做了我想要的...除了,为了阻止它,我在while循环中引入了try:except:语句,如下面的代码所示。当然,按CTRL+C确实会打破循环......但它随后也会破坏整个运行脚本,并给我留下以下错误:forrtl: error (200): program aborting due to control-C event,在 PyCharm 中,以及从终端运行时的以下精度:

Image              PC                Routine            Line        Source
libifcoremd.dll    00007FFECF413B58  Unknown               Unknown  Unknown
KERNELBASE.dll     00007FFF219F60A3  Unknown               Unknown  Unknown
KERNEL32.DLL       00007FFF23847BD4  Unknown               Unknown  Unknown
ntdll.dll          00007FFF240CCED1  Unknown               Unknown  Unknown
QObject::~QObject: Timers cannot be stopped from another thread

不便的是,我别无选择,只能关闭python shell(再次考虑PyCharm),并且我无法访问我宝贵的变量data,其中包含...好吧,我的数据。

猜:显然,回调不喜欢在这个法中停止。nidaqmx_python任务应通过task.stop()停止。我尝试将task.stop()放在键盘中断except:之后,但这无济于事,因为CTRL+C停止脚本在顶部/而不是中断while循环。我相信需要一些更复杂的方法来停止我的任务。我已经考虑了好几天,但想不出同时拥有这两件事的方法:我可以停止的任务,同时实时绘图。请注意,如果没有绘图,ENTER按键时很容易停止任务:只需在最后写入

input('Press ENTER to stop task')
task.stop()

但是,当然,简单地执行上述操作不允许我包含实时绘图部分。

总结:我无法从连续读取数据的回调中调用 matplotlib,所以我在一个单独的块中编写了一个用于实时绘制的while循环,但随后我看不到在没有收到上述错误的情况下停止该while循环的方法(它抱怨回调是从不同的线程停止的, 我认为)。

我希望我说清楚,如果没有,请问!

代码:我已经清理了它,使其尽可能接近显示问题的 MWE,尽管我当然意识到你们中的大多数人都没有 NI daq 来玩转和连接以便能够运行它。无论如何。。。在这里:

import matplotlib.pyplot as plt
import numpy as np
import nidaqmx
from nidaqmx import stream_readers
from nidaqmx import constants
sfreq = 1000
bufsize = 100
with nidaqmx.Task() as task:
# Here we set up the task ... nevermind
task.ai_channels.add_ai_voltage_chan("cDAQ2Mod1/ai1")
task.timing.cfg_samp_clk_timing(rate=sfreq, sample_mode=constants.AcquisitionType.CONTINUOUS,
samps_per_chan=bufsize)
# Here we define a stream to be read continuously
stream = stream_readers.AnalogMultiChannelReader(task.in_stream)
data = np.zeros((1, 0))  # initializing an empty numpy array for my total data
buffer = np.zeros((1, bufsize))  # defined so that global buffer can be written to by the callback
# This is my callback to read data continuously
def reading_task_callback(task_idx, event_type, num_samples, callback_data):  # bufsize is passed to num_samples when this is called
global data
global buffer
buffer = np.zeros((1, num_samples))
# This is the reading part
stream.read_many_sample(buffer, num_samples, timeout=constants.WAIT_INFINITELY)
data = np.append(data, buffer, axis=1)  # appends buffered data to variable data
return 0  # Absolutely needed for this callback to be well defined (see nidaqmx doc).
# Here is the heavy lifting I believe: the above callback is registered
task.register_every_n_samples_acquired_into_buffer_event(bufsize, reading_task_callback)
task.start()  # The task is started (callback called periodically)
print('Acquiring sensor data. Press CTRL+C to stop the run.n')  # This should work ...
fig = plt.figure()
try:
while True:
# Poor's man plot updating
plt.clf()
plt.plot(buffer.T)
plt.show()
plt.pause(0.01)  # 100 Hz refresh rate
except KeyboardInterrupt:  # stop loop with CTRL+C ... or so I thought :-(
plt.close(fig)
pass
task.stop()  # I believe I never get to this part after pressing CTRL+C ...
# Some prints at the end ... nevermind
print('Total number of acquired samples: ', len(data.T),'n')
print('Sampling frequency: ', sfreq, 'Hzn')
print('Buffer size: ', bufsize, 'n')
print('Acquisition duration: ', len(data.T)/sfreq, 'sn')

任何意见将不胜感激。提前谢谢大家!

编辑:在下面接受的答案之后,我重写了上面的代码并提出了以下内容,现在可以按预期工作(对不起,这次我没有清理它,有些行与当前问题无关):

# Stream read from a task that is set up to read continuously
import matplotlib.pyplot as plt
import numpy as np
import nidaqmx
from nidaqmx import stream_readers
from nidaqmx import constants
from scipy import signal
import threading
running = True
sfreq = 1000
bufsize = 100
bufsizeb = 100
global task
def askUser():  # it might be better to put this outside of task
global running
input("Press return to stop.")
running = False
def main():
global running
global data
global buffer
global data_filt
global buffer_filt
global b
global z
print('Acquiring sensor data...')
with nidaqmx.Task() as task:  # maybe we can use target as above
thread = threading.Thread(target=askUser)
thread.start()
task.ai_channels.add_ai_voltage_chan("cDAQ2Mod1/ai1")
task.timing.cfg_samp_clk_timing(rate=sfreq, sample_mode=constants.AcquisitionType.CONTINUOUS,
samps_per_chan=bufsize)
# unclear samps_per_chan is needed here above or why it would be different than bufsize
stream = stream_readers.AnalogMultiChannelReader(task.in_stream)
data = np.zeros((1, 0))  # probably not the most elegant way of initializing an empty numpy array
buffer = np.zeros((1, bufsizeb))  # defined so that global buffer can be written in the callback
data_filt = np.zeros((1, 0))  # probably not the most elegant way of initializing an empty numpy array
buffer_filt = np.zeros((1, bufsizeb))  # defined so that global buffer can be written in the callback
b = signal.firwin(150, 0.004)
z = signal.lfilter_zi(b, 1)
def reading_task_callback(task_idx, event_type, num_samples, callback_data):  # bufsizeb is passed to num_samples
global data
global buffer
global data_filt
global buffer_filt
global z
global b
if running:
# It may be wiser to read slightly more than num_samples here, to make sure one does not miss any sample,
# see: https://documentation.help/NI-DAQmx-Key-Concepts/contCAcqGen.html
buffer = np.zeros((1, num_samples))
stream.read_many_sample(buffer, num_samples, timeout=constants.WAIT_INFINITELY)
data = np.append(data, buffer, axis=1)  # appends buffered data to variable data
# IIR Filtering, low-pass
buffer_filt = np.zeros((1, num_samples))
for i, x in enumerate(np.squeeze(buffer)):  # squeeze required for x to be just a scalar (which lfilter likes)
buffer_filt[0,i], z = signal.lfilter(b, 1, [x], zi=z)
data_filt = np.append(data_filt, buffer_filt, axis=1)  # appends buffered filtered data to variable data_filt
return 0  # Absolutely needed for this callback to be well defined (see nidaqmx doc).
task.register_every_n_samples_acquired_into_buffer_event(bufsizeb, reading_task_callback)  # bufsizeb instead
task.start()
while running:  # this is perfect: it "stops" the console just like sleep in a way that the task does not stop
plt.clf()
plt.plot(buffer.T)
plt.draw()
plt.pause(0.01)  # 100 Hz refresh rate
# plt.close(fig)  # maybe no need to close it for now
# task.join()  # this is for threads I guess ... (seems useless to my case?)
# Some prints at the end ...
print('Total number of acquired samples:', len(data.T))
print('Sampling frequency:', sfreq, 'Hz')
print('Buffer size:', bufsize)
print('Acquisition duration:', len(data.T)/sfreq, 's')
if __name__ == '__main__':
main()

请注意,我毕竟不需要task.stop(),因为连续采集任务使用此包的方式是,在task.start()之后读取任何不是sleep或类似代码行的代码都会使任务停止(至少这是我的理解)。

我做的第一件事是摆脱键盘中断循环。我用一个全局变量running替换了它,另一个线程将变量设置为False返回时。

def askUser():
global running
input("Press return to stop.")
running = False

然后,在while loop之前,创建了一个将执行此函数的新线程。

askUserThread = threading.Thread(target=askUser)
askUserThread.start()

对于while循环,摆脱trycatch语句:

while running:
plt.clf()
plt.plot(buffer.T)
plt.draw()          # Note: this got changed because .show wasn't working.
plt.pause(0.01)

这仍然对我不起作用,因为我不得不关闭情节窗口才能显示新的。所以从这个答案开始,我把它从.show改成了.draw.

我的结束代码有点不同(因为我采样了随机数据),但它就在这里。

# sampling.py
# by Preston Hager
import matplotlib.pyplot as plt
import numpy as np
import threading
sfreq = 1000
bufsize = 100
running = True
data = np.zeros((1, 0))  # initializing an empty numpy array for my total data
buffer = np.zeros((1, bufsize))  # defined so that global buffer can be written to by the callback
def askUser():
global running
input("Press return to stop.")
running = False
def readingTask():
global data
global buffer
while running:
buffer = np.random.rand(1, bufsize)
# This is the reading part
data = np.append(data, buffer, axis=1)  # appends buffered data to variable data
def main():
global running
print('Acquiring sensor data.')
thread = threading.Thread(target=askUser)
thread.start()
task = threading.Thread(target=readingTask)
task.start()
fig = plt.figure()
while running:
# Poor's man plot updating
plt.clf()
plt.plot(buffer.T)
plt.draw()
plt.pause(0.01)  # 100 Hz refresh rate
plt.close(fig)
task.join()
# Some prints at the end ... nevermind
print('Total number of acquired samples:', len(data.T))
print('Sampling frequency:', sfreq, 'Hz')
print('Buffer size:', bufsize)
print('Acquisition duration:', len(data.T)/sfreq, 's')
if __name__ == '__main__':
main()

最新更新