通过一个简单的例子,我试图演示一个具有两个进程的典型多处理设置:
- 接收数据(此处通过随机数组生成模拟(
- 发送数据
我想通过键盘按键终止第一个进程,这将向队列发送None
,然后队列停止程序。我使用键盘包来检测是否按下了某个键。
import multiprocessing
import keyboard
import numpy as np
def getData(queue):
KEY_PRESSED = False
while KEY_PRESSED is False:
if keyboard.is_pressed("a"):
queue.put(None)
print("STOP in getData")
KEY_PRESSED=True
else:
data = np.random.random([8, 250])
queue.put(data)
def processData(queue):
FLAG_STOP = False
while FLAG_STOP is False:
data = queue.get() # # ch, samples
if data is None:
print("STOP in processData")
FLAG_STOP = True
else:
print("Processing Data")
print(str(data[0,0]))
if __name__ == "__main__":
queue = multiprocessing.Queue()
processes = [
multiprocessing.Process(target=getData, args=(queue,)),
multiprocessing.Process(target=processData, args=(queue,))
]
for p in processes:
p.start()
for p in processes:
p.join()
如果我调试代码,按下的键实际上会被检测到,但同时while循环中的随机数据会被放入队列中。这使得调试代码变得非常困难。
此外,我还尝试了pynput包,它创建了一个线程来检测按下的键。然而,使用这种方法也出现了同样的问题,程序并没有通过向其他进程发送None
来保存地终止执行。
如果有人能指出所描述的方法中的错误,或者提出另一种在进程中保存检测按键的方法,我将非常高兴。
我不确定你描述的是什么问题:savely不是英语单词。你说按下的键实际上被检测到了。如果是这种情况,并且两个函数中都有print("STOP...")
语句,那么如果您只是在命令提示符下运行代码,并且getData
检测到有人按下了a
,那么我看不出两个打印语句最终都不会执行,并且这两个进程终止。
如果问题是程序在很长一段时间内没有终止,那么我认为您缺少的是,除非对keyboard.is_pressed("a")
的调用是一个执行速度特别慢的函数,否则当您按下键盘上的a
时,函数getData
在写入None
之前已经向队列写入了数千条记录。这意味着processData
必须首先读取这数千条记录并打印它们,然后才能最终到达None
记录。由于它还必须打印数字,processData
无法跟上getData
。在getData
写入其None
记录之后,processData
仍有数千条记录要读取。
这可以在代码的变体中得到证明,其中函数getData
不等待键盘输入,而是简单地在写入其None
记录并终止之前将随机数写入输出队列5秒(这模拟了在按下a
之前等待5秒的程序(。函数processData
打印它在到达None
记录之前读取的记录数,以及读取和打印这些记录所花费的时间:
import multiprocessing
import numpy as np
import time
def getData(queue):
KEY_PRESSED = False
expiration = time.time() + 5
# Run for 5 seconds:
while KEY_PRESSED is False:
if time.time() > expiration:
queue.put(None)
print("STOP in getData")
KEY_PRESSED=True
else:
data = np.random.random([8, 250])
queue.put(data)
def processData(queue):
FLAG_STOP = False
t = time.time()
cnt = 0
while FLAG_STOP is False:
data = queue.get() # # ch, samples
if data is None:
print("STOP in processData")
print('Number of items read from queue:', cnt, 'elapsed_time:', time.time() - t)
FLAG_STOP = True
else:
cnt += 1
print("Processing Data")
print(str(data[0,0]))
if __name__ == "__main__":
queue = multiprocessing.Queue()
processes = [
multiprocessing.Process(target=getData, args=(queue,)),
multiprocessing.Process(target=processData, args=(queue,))
]
for p in processes:
p.start()
for p in processes:
p.join()
打印:
...
Processing Data
0.21449036510257957
Processing Data
0.27058883046461824
Processing Data
0.5433716680659376
STOP in processData
Number of items read from queue: 128774 elapsed_time: 35.389172077178955
尽管getData
只写了5秒的数字,但processData
读取和打印数字需要35秒。
这个问题可以通过限制队列实例上的消息数量来解决:
queue = multiprocessing.Queue(1)
这将阻止getData
将下一个值放入队列,直到processData
读取该值为止。
打印:
...
Processing Data
0.02822635996071321
Processing Data
0.05390434023333657
Processing Data
STOP in getData
0.9030600225686444
STOP in processData
Number of items read from queue: 16342 elapsed_time: 5.000030040740967
因此,如果使用最大大小为1的队列,则程序应在按下a
键后立即终止