基本上,我的目标是收集数据并在运行时发送它,并且我想异步地完成它-每个消息一个协同例程。我试图建立一个小的例子,但它并没有超出我的期望。我假设我需要两个无限循环和一个队列让它们相互通信——一个循环获取用户输入,另一个循环监听队列并在队列中有内容时输出数据。
下面是我的例子:
import asyncio
async def output(queue):
while True:
data = await queue.get()
if data:
await asyncio.sleep(5)
print(data)
async def main():
queue = asyncio.Queue()
loop = asyncio.get_event_loop()
loop.create_task(output(queue))
while True:
data = await loop.run_in_executor(None, input)
await queue.put(data)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()
预期结果是这样的(>>>表示输入和<<<输出):>
>>> 1
--- 0.0001 seconds delay before I enter my next input
>>> 2
--- 5 seconds delay
<<< 1
--- ~0.0001 seconds delay
<<< 2
但是实际的结果是这样的:
>>> 1
--- 0.0001 seconds delay before I enter my next input
>>> 2
--- 5 seconds delay
<<< 1
--- 5 seconds delay
<<< 2
我如何做到这一点?
您说您希望"每条消息一个协同例程"。所以只要那样做——然后你就不需要Queue了。我在您的print语句中添加了一些时间戳,这样您就可以看到两个5秒等待实际上是并行运行的。我也取代了你的主要()函数中的代码与一个简单的调用方便函数asyncio.run()。
import time
import asyncio
async def output(data):
await asyncio.sleep(5)
print("<<<", data, time.ctime())
async def main():
loop = asyncio.get_event_loop()
while True:
data = await loop.run_in_executor(None, input)
print(">>>", data, time.ctime())
asyncio.create_task(output(data))
if __name__ == "__main__":
asyncio.run(main())
>>> 1 Tue Oct 5 21:44:18 2021
>>> 2 Tue Oct 5 21:44:18 2021
<<< 1 Tue Oct 5 21:44:23 2021
<<< 2 Tue Oct 5 21:44:23 2021