如何使用asyncio并发运行多个协程



我使用websockets库在Python 3.4中创建websocket服务器。下面是一个简单的回显服务器:

import asyncio
import websockets
@asyncio.coroutine
def connection_handler(websocket, path):
    while True:
        msg = yield from websocket.recv()
        if msg is None:  # connection lost
            break
        yield from websocket.send(msg)
start_server = websockets.serve(connection_handler, 'localhost', 8000)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

假设我们——另外——想要在某个事件发生时向客户端发送消息。为简单起见,让我们每隔60秒周期性地发送一条消息。我们该怎么做呢?我的意思是,因为connection_handler一直在等待传入的消息,服务器只能在收到来自客户机的消息后才能采取操作,对吗?我遗漏了什么?

也许这个场景需要一个基于事件/回调的框架,而不是一个基于协程的框架?龙卷风吗?

您可以使用gather

来自Python文档:

import asyncio
async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")
async def main():
    # Schedule three calls *concurrently*:
    await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )
asyncio.run(main())
# Expected output:
#
#     Task A: Compute factorial(2)...
#     Task B: Compute factorial(2)...
#     Task C: Compute factorial(2)...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3)...
#     Task C: Compute factorial(3)...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4)...
#     Task C: factorial(4) = 24

TL;DR您可以使用asyncio.create_task()同时运行多个协程


也许这个场景需要一个基于事件/回调的框架,而不是一个基于协程的框架?龙卷风吗?

不,您不需要任何其他框架。异步应用程序与同步应用程序的整体思想是在等待结果时不会阻塞。不管它是如何实现的,是使用协程还是回调。

我的意思是,因为connection_handler一直在等待传入的消息,服务器只能在收到来自客户端的消息后采取行动,对吗?我遗漏了什么?

在同步应用程序中,您将编写类似msg = websocket.recv()的东西,它将阻塞整个应用程序,直到您收到消息(如您所述)。但是在异步应用程序中,这是完全不同的。

当你执行msg = yield from websocket.recv()时,你可以说:暂停connection_handler()的执行,直到websocket.recv()产生一些东西。在协程中使用yield from将控制返回给事件循环,因此当我们等待websocket.recv()的结果时,可以执行其他代码。请参考文档以更好地理解协程是如何工作的。

假设我们——另外——想要在某个事件发生时向客户端发送消息。为简单起见,让我们每隔60秒周期性地发送一条消息。我们该怎么做呢?

你可以使用asyncio.async()来运行任意多的协程,在执行阻塞调用开始事件循环之前。

import asyncio
import websockets
# here we'll store all active connections to use for sending periodic messages
connections = []

@asyncio.coroutine
def connection_handler(connection, path):
    connections.append(connection)  # add connection to pool
    while True:
        msg = yield from connection.recv()
        if msg is None:  # connection lost
            connections.remove(connection)  # remove connection from pool, when client disconnects
            break
        else:
            print('< {}'.format(msg))
        yield from connection.send(msg)
        print('> {}'.format(msg))

@asyncio.coroutine
def send_periodically():
    while True:
        yield from asyncio.sleep(5)  # switch to other code and continue execution in 5 seconds
        for connection in connections:
            print('> Periodic event happened.')
            yield from connection.send('Periodic event happened.')  # send message to each connected client

start_server = websockets.serve(connection_handler, 'localhost', 8000)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.async(send_periodically())  # before blocking call we schedule our coroutine for sending periodic messages
asyncio.get_event_loop().run_forever()

下面是一个示例客户端实现。它要求您输入名称,从echo服务器接收它,等待服务器的另外两条消息(这是我们的定期消息)并关闭连接。

import asyncio
import websockets

@asyncio.coroutine
def hello():
    connection = yield from websockets.connect('ws://localhost:8000/')
    name = input("What's your name? ")
    yield from connection.send(name)
    print("> {}".format(name))
    for _ in range(3):
        msg = yield from connection.recv()
        print("< {}".format(msg))
    yield from connection.close()

asyncio.get_event_loop().run_until_complete(hello())

重要的几点:

  1. 在Python 3.4.4中asyncio.async()被重命名为asyncio.ensure_future(),在Python 3.7中添加了asyncio.create_task(),优先于asyncio.ensure_future()

  2. 有特殊的方法来调度延迟调用,但它们不适用于协程。

同样的问题,很难得到解决,直到我看到完美的样本在这里:http://websockets.readthedocs.io/en/stable/intro.html#both

 done, pending = await asyncio.wait(
        [listener_task, producer_task],
        return_when=asyncio.FIRST_COMPLETED)  # Important

因此,我可以处理多协程任务,如心跳和redis订阅。

如果您使用的是Python 3.7及以后的,您可以使用asyncio.gather()asyncio.run(),如下所示:

import asyncio
async def coro1():
    for i in range(1, 6):
        print(i)
        await asyncio.sleep(0)  # switches task every one iteration.
async def coro2():
    for i in range(1, 6):
        print(i * 10)
        await asyncio.sleep(0)  # switches task every one iteration.
async def main():
    await asyncio.gather(
        coro1(),
        coro2(),
    )
asyncio.run(main())

## Or instead of defining the main async function:
futures = [coro1(), coro2()]
await asyncio.gather(*futures)

Python 3.11中添加了新的asyncio.TaskGroup,您不需要.gather():

async def main():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(coro1())
        tg.create_task(coro2())
asyncio.run(main())

否则,如果您正在使用Python 3.6或3.5,请执行以下操作以获得相同的结果,您也应该处理循环:

import asyncio
async def coro1():
    for i in range(1, 6):
        print(i)
        await asyncio.sleep(0)  # switches task every one iteration.
async def coro2():
    for i in range(1, 6):
        print(i * 10)
        await asyncio.sleep(0)  # switches task every one iteration.
loop = asyncio.get_event_loop()
futures = [
    asyncio.ensure_future(coro1()),
    asyncio.ensure_future(coro2())
]
loop.run_until_complete(asyncio.gather(*futures))
loop.close()

:

1
10
2
20
3
30
4
40
5
50

相关内容

  • 没有找到相关文章

最新更新