如何使用asyncio将基于事件的通信转换为async/await.未来



我正试图将基于事件的通信公开为协程。这里有一个例子:

class Terminal:
async def start(self):
loop = asyncio.get_running_loop()
future = loop.create_future()
t = threading.Thread(target=self.run_cmd, args=future)
t.start()
return await future
def run_cmd(self, future):
time.sleep(3)  # imitating doing something
future.set_result(1)

但当我这样运行时:

async def main():
t = Terminal()
result = await t.start()
print(result)
asyncio.run(main())

我得到以下错误:运行时间错误:等待没有与未来的一起使用

有可能实现期望的行为吗?

您的代码有两个问题。一种是Thread构造函数的args参数需要一个序列或可迭代的,因此您需要将该参数写入容器中,例如args=(future,)。由于future是可迭代的(由于与此用例无关的技术原因(,args=future不会立即被拒绝,而是会导致以后的误导性错误。

另一个问题是异步对象不是线程安全的,所以不能只从另一个线程调用future.set_result。这导致测试程序即使在修复了第一个问题之后也会挂起。从另一个线程解析未来的正确方法是通过事件循环上的call_soon_threadsafe方法:

class Terminal:
async def start(self):
loop = asyncio.get_running_loop()
future = loop.create_future()
t = threading.Thread(target=self.run_cmd, args=(loop, future,))
t.start()
return await future
def run_cmd(self, loop, future):
time.sleep(3)
loop.call_soon_threadsafe(future.set_result, 1)

如果你的线程真的只是调用一个你感兴趣的阻塞函数,那么考虑使用run_in_executor而不是手动生成线程:

class Terminal:
async def start(self):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, self.run_cmd)
# Executed in a different thread; `run_in_executor` submits the
# callable to a thread pool, suspends the awaiting coroutine until
# it's done, and transfers the result/exception back to asyncio.
def run_cmd(self):
time.sleep(3)
return 1

最新更新