如何在Python进程中检测按键



通过一个简单的例子,我试图演示一个具有两个进程的典型多处理设置:

  1. 接收数据(此处通过随机数组生成模拟(
  2. 发送数据

我想通过键盘按键终止第一个进程,这将向队列发送None,然后队列停止程序。我使用键盘包来检测是否按下了某个键。

import multiprocessing
import keyboard
import numpy as np
def getData(queue):
KEY_PRESSED = False
while KEY_PRESSED is False:

if keyboard.is_pressed("a"):
queue.put(None)
print("STOP in getData")
KEY_PRESSED=True
else:
data = np.random.random([8, 250])
queue.put(data)
def processData(queue):

FLAG_STOP = False
while FLAG_STOP is False:
data = queue.get()  # # ch, samples
if data is None:
print("STOP in processData")
FLAG_STOP = True
else:
print("Processing Data")
print(str(data[0,0]))
if __name__ == "__main__":
queue = multiprocessing.Queue()
processes = [
multiprocessing.Process(target=getData, args=(queue,)),
multiprocessing.Process(target=processData, args=(queue,))
]
for p in processes:
p.start()
for p in processes:
p.join()

如果我调试代码,按下的键实际上会被检测到,但同时while循环中的随机数据会被放入队列中。这使得调试代码变得非常困难。

此外,我还尝试了pynput包,它创建了一个线程来检测按下的键。然而,使用这种方法也出现了同样的问题,程序并没有通过向其他进程发送None来保存地终止执行。

如果有人能指出所描述的方法中的错误,或者提出另一种在进程中保存检测按键的方法,我将非常高兴。

我不确定你描述的是什么问题:savely不是英语单词。你说按下的键实际上被检测到了。如果是这种情况,并且两个函数中都有print("STOP...")语句,那么如果您只是在命令提示符下运行代码,并且getData检测到有人按下了a,那么我看不出两个打印语句最终都不会执行,并且这两个进程终止。

如果问题是程序在很长一段时间内没有终止,那么我认为您缺少的是,除非对keyboard.is_pressed("a")的调用是一个执行速度特别慢的函数,否则当您按下键盘上的a时,函数getData在写入None之前已经向队列写入了数千条记录。这意味着processData必须首先读取这数千条记录并打印它们,然后才能最终到达None记录。由于它还必须打印数字,processData无法跟上getData。在getData写入其None记录之后,processData仍有数千条记录要读取。

这可以在代码的变体中得到证明,其中函数getData不等待键盘输入,而是简单地在写入其None记录并终止之前将随机数写入输出队列5秒(这模拟了在按下a之前等待5秒的程序(。函数processData打印它在到达None记录之前读取的记录数,以及读取和打印这些记录所花费的时间:

import multiprocessing
import numpy as np
import time
def getData(queue):
KEY_PRESSED = False
expiration = time.time() + 5
# Run for 5 seconds:
while KEY_PRESSED is False:
if time.time() > expiration:
queue.put(None)
print("STOP in getData")
KEY_PRESSED=True
else:
data = np.random.random([8, 250])
queue.put(data)
def processData(queue):
FLAG_STOP = False
t = time.time()
cnt = 0
while FLAG_STOP is False:
data = queue.get()  # # ch, samples
if data is None:
print("STOP in processData")
print('Number of items read from queue:', cnt, 'elapsed_time:', time.time() - t)
FLAG_STOP = True
else:
cnt += 1
print("Processing Data")
print(str(data[0,0]))
if __name__ == "__main__":
queue = multiprocessing.Queue()
processes = [
multiprocessing.Process(target=getData, args=(queue,)),
multiprocessing.Process(target=processData, args=(queue,))
]
for p in processes:
p.start()
for p in processes:
p.join()

打印:

...
Processing Data
0.21449036510257957
Processing Data
0.27058883046461824
Processing Data
0.5433716680659376
STOP in processData
Number of items read from queue: 128774 elapsed_time: 35.389172077178955

尽管getData只写了5秒的数字,但processData读取和打印数字需要35秒。

这个问题可以通过限制队列实例上的消息数量来解决:

queue = multiprocessing.Queue(1)

这将阻止getData将下一个值放入队列,直到processData读取该值为止。

打印:

...
Processing Data
0.02822635996071321
Processing Data
0.05390434023333657
Processing Data
STOP in getData
0.9030600225686444
STOP in processData
Number of items read from queue: 16342 elapsed_time: 5.000030040740967

因此,如果使用最大大小为1的队列,则程序应在按下a键后立即终止

相关内容

  • 没有找到相关文章

最新更新