我正在尝试构建一个异步运行的脚本,同时具有基于stdin用户输入的启动/停止机制。
我创建了两个线程,一个用于异步工作,另一个用于从stdin读取。我的想法是程序运行直到用户键入";停止";在stdin中,异步任务等待直到用户键入";"开始";在stdin中。
这是我当前的代码:
class DataManager(metaclass=Singleton):
def __init__(self):
self.flag = threading.Event()
self.flag.set()
# Thread for reading user input
self.pool_thread = threading.Thread(target=self.__pool_input())
self.pool_thread.daemon = True
self.pool_thread.start()
# Method to create thread for asynchronous tasks
def start(self):
if self.flag.is_set():
self.flag.clear()
self.main_thread = threading.Thread(target=self.__main_wrapper)
self.main_thread.start()
# Method for reading user stdin
def __pool_input(self):
val = input()
if val == "stop":
self.stop()
elif val == "start":
self.start()
# Wrapper method to start the event loop and call async context
def __main_wrapper(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self.__main())
loop.close()
# Main async loop
async def __main(self):
while not self.flag.is_set():
# Run async stuff
# ...
print('Thread is running')
await asyncio.sleep(5)
# Stop the main async loop on user command
def stop(self):
if not self.flag.is_set():
self.flag.set()
if __name__ == "__main__":
data_manager = DataManager()
data_manager.start()
预期行为
- 异步(主(线程在循环中运行
- 用户类型";停止";和异步线程停止
- 用户类型";"开始";异步循环再次运行
当前行为
- 异步线程被阻止,直到用户在stdin上键入
- 异步线程开始运行
- 异步线程运行时Stdin被阻止
除了必须以某种方式保持__pool_input
线程处于活动状态之外(因为一旦它读取输入,线程结束,我就再也不会启动它(,我不知道如何使所需的结果发挥作用。
您的程序不起作用,因为您不是将__pool_input
作为target
传递,而是调用它
self.pool_thread = threading.Thread(target=self.__pool_input)
我想这应该行得通。一旦正如你所说。
您可以像在__main_wrapper()
中那样,在__pool_input()
中创建一个无限循环,以便再次从stdin中读取。
话虽如此,我认为你应该改变你的设计。异步代码的伟大之处在于,您不需要线程(对于任何I/O(,并且线程化非常困难。所以最好尽量避开它们。最近也有一个关于异步input
的类似问题。也许你在里面找到了你喜欢的东西。