我在脚本中运行两个单独的进程。第一个进程 p1 启动一个 oneSecondTimer 例程,该例程正好在 1 秒处执行并执行一些工作。第二个过程,p2,触发一个键盘侦听器,嗯,听键盘。
目前,我希望 p1 进程在用户按下转义键时停止。我尝试使用全局变量,它不起作用。我尝试使用队列,它奏效了,但它绝对不是最优雅的解决方案。它实际上是一个丑陋的解决方法,不会扩大规模。
最终,脚本将具有许多单独的并行进程,这些进程将通过按各种键来控制(而不仅仅是启动/停止(。 这是代码,
import time
from pynput import keyboard
from multiprocessing import Process, Queue
def on_release(key):
if key == keyboard.Key.esc:
print('escaped!')
# Stop listener
return False
def keyboardListener(q):
with keyboard.Listener(on_release=on_release) as listener:
listener.join()
print('Keybord Listener Terminated!!!')
# Make the queue NOT EMPTY
q.put('Terminate')
def oneSecondTimer(q):
starttime = time.time()
# Terminate the infinite loop if
# queue is NOT EMPTY
while (not q.qsize()):
print("tick")
time.sleep(1.0 - ((time.time() - starttime) % 1.0))
return False
if __name__ == '__main__':
q = Queue()
p1 = Process(target=oneSecondTimer, args=(q,))
p1.start()
p2 = Process(target=keyboardListener, args=(q,))
p2.start()
终于设法完成了这项工作。 在上面的代码片段中,当我调用
对于键盘侦听器事件,它实质上是阻止执行键盘侦听器(q(进程的其余部分,直到on_release(键(函数停止。因为这正是 .join(( 应该做的。这是一个阻止调用。listener.join((
在下面的代码片段中,keyboard.listener 线程只是在 keyboardListener(q( 进程中启动的。while 循环跟踪名为 fetchKeyPress 的变量。该变量执行名称所暗示的操作,它在 on_release(key( 子例程中获取按下的键。由fetchKeyPress获取的按键被泵入名为q的队列中,该队列在两个进程之间共享,键盘听众(key(和oneSecondTimer(q(。keyboardListener 进程的运行速度是 oneSecondTimer 进程的 4 倍,具有退出 while 循环的逻辑,并在用户连续/重复按下相同的键时防止队列泛滥。
oneSecondTimer(q( 进程每秒运行一次。如果 q 不为空,它会吐出 q 中的任何内容。它还内置了一个 while 循环退出逻辑。
现在,我可以利用进程 p2 获取的数据(按键(,并将其用于其他并行运行的进程 p1。
p2 是生产者。p1 是消费者。
import time
from pynput import keyboard
from multiprocessing import Process, Queue
fetchKeyPress = 10
def on_release(key):
global fetchKeyPress
fetchKeyPress = key
if key == keyboard.Key.esc:
fetchKeyPress = 0
print('escaped!')
# Stop listener
return False
def keyboardListener(q):
global fetchKeyPress
prevKeyFetch = 10 # Keep track of the previous keyPress
keyboard.Listener(on_release=on_release).start()
while (fetchKeyPress):
print ('Last Key Pressed was ', fetchKeyPress)
# Fill the Queue only when a new key is pressed
if (not (fetchKeyPress == prevKeyFetch)):
q.put(fetchKeyPress)
# Update the previous keyPress
prevKeyFetch = fetchKeyPress
time.sleep(0.25)
print('Keybord Listener Terminated!!!')
q.put('Terminate')
def oneSecondTimer(q):
runner = True # Runs the while() loop
starttime = time.time()
while (runner):
print('ttick')
if (not q.empty()):
qGet = q.get()
print ('tQueue Size ', q.qsize())
print ('tQueue out ', qGet)
# Condition to terminate the program
if (qGet == 'Terminate'):
# Make runner = False to terminate the While loop
runner = False
time.sleep(1.0 - ((time.time() - starttime) % 1.0))
return False
if __name__ == '__main__':
q = Queue()
p1 = Process(target=oneSecondTimer, args=(q,))
p1.start()
p2 = Process(target=keyboardListener, args=(q,))
p2.start()
但我认为归根结底,由于简单性,我可能只是要使用 https://pypi.org/project/keyboard/库。感谢@toti08在上面的评论中提出的建议。