我使用websockets
库在Python 3.4中创建websocket服务器。下面是一个简单的回显服务器:
import asyncio
import websockets
@asyncio.coroutine
def connection_handler(websocket, path):
while True:
msg = yield from websocket.recv()
if msg is None: # connection lost
break
yield from websocket.send(msg)
start_server = websockets.serve(connection_handler, 'localhost', 8000)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
假设我们——另外——想要在某个事件发生时向客户端发送消息。为简单起见,让我们每隔60秒周期性地发送一条消息。我们该怎么做呢?我的意思是,因为connection_handler
一直在等待传入的消息,服务器只能在收到来自客户机的消息后才能采取操作,对吗?我遗漏了什么?
您可以使用gather
来自Python文档:
import asyncio
async def factorial(name, number):
f = 1
for i in range(2, number + 1):
print(f"Task {name}: Compute factorial({i})...")
await asyncio.sleep(1)
f *= i
print(f"Task {name}: factorial({number}) = {f}")
async def main():
# Schedule three calls *concurrently*:
await asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
)
asyncio.run(main())
# Expected output:
#
# Task A: Compute factorial(2)...
# Task B: Compute factorial(2)...
# Task C: Compute factorial(2)...
# Task A: factorial(2) = 2
# Task B: Compute factorial(3)...
# Task C: Compute factorial(3)...
# Task B: factorial(3) = 6
# Task C: Compute factorial(4)...
# Task C: factorial(4) = 24
TL;DR您可以使用asyncio.create_task()
同时运行多个协程
也许这个场景需要一个基于事件/回调的框架,而不是一个基于协程的框架?龙卷风吗?
不,您不需要任何其他框架。异步应用程序与同步应用程序的整体思想是在等待结果时不会阻塞。不管它是如何实现的,是使用协程还是回调。
我的意思是,因为connection_handler一直在等待传入的消息,服务器只能在收到来自客户端的消息后采取行动,对吗?我遗漏了什么?
在同步应用程序中,您将编写类似msg = websocket.recv()
的东西,它将阻塞整个应用程序,直到您收到消息(如您所述)。但是在异步应用程序中,这是完全不同的。
当你执行msg = yield from websocket.recv()
时,你可以说:暂停connection_handler()
的执行,直到websocket.recv()
产生一些东西。在协程中使用yield from
将控制返回给事件循环,因此当我们等待websocket.recv()
的结果时,可以执行其他代码。请参考文档以更好地理解协程是如何工作的。
假设我们——另外——想要在某个事件发生时向客户端发送消息。为简单起见,让我们每隔60秒周期性地发送一条消息。我们该怎么做呢?
你可以使用asyncio.async()
来运行任意多的协程,在执行阻塞调用开始事件循环之前。
import asyncio
import websockets
# here we'll store all active connections to use for sending periodic messages
connections = []
@asyncio.coroutine
def connection_handler(connection, path):
connections.append(connection) # add connection to pool
while True:
msg = yield from connection.recv()
if msg is None: # connection lost
connections.remove(connection) # remove connection from pool, when client disconnects
break
else:
print('< {}'.format(msg))
yield from connection.send(msg)
print('> {}'.format(msg))
@asyncio.coroutine
def send_periodically():
while True:
yield from asyncio.sleep(5) # switch to other code and continue execution in 5 seconds
for connection in connections:
print('> Periodic event happened.')
yield from connection.send('Periodic event happened.') # send message to each connected client
start_server = websockets.serve(connection_handler, 'localhost', 8000)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.async(send_periodically()) # before blocking call we schedule our coroutine for sending periodic messages
asyncio.get_event_loop().run_forever()
下面是一个示例客户端实现。它要求您输入名称,从echo服务器接收它,等待服务器的另外两条消息(这是我们的定期消息)并关闭连接。
import asyncio
import websockets
@asyncio.coroutine
def hello():
connection = yield from websockets.connect('ws://localhost:8000/')
name = input("What's your name? ")
yield from connection.send(name)
print("> {}".format(name))
for _ in range(3):
msg = yield from connection.recv()
print("< {}".format(msg))
yield from connection.close()
asyncio.get_event_loop().run_until_complete(hello())
重要的几点:
在Python 3.4.4中
asyncio.async()
被重命名为asyncio.ensure_future()
,在Python 3.7中添加了asyncio.create_task()
,优先于asyncio.ensure_future()
。有特殊的方法来调度延迟调用,但它们不适用于协程。
同样的问题,很难得到解决,直到我看到完美的样本在这里:http://websockets.readthedocs.io/en/stable/intro.html#both
done, pending = await asyncio.wait(
[listener_task, producer_task],
return_when=asyncio.FIRST_COMPLETED) # Important
因此,我可以处理多协程任务,如心跳和redis订阅。
如果您使用的是Python 3.7及以后的,您可以使用asyncio.gather()
和asyncio.run()
,如下所示:
import asyncio
async def coro1():
for i in range(1, 6):
print(i)
await asyncio.sleep(0) # switches task every one iteration.
async def coro2():
for i in range(1, 6):
print(i * 10)
await asyncio.sleep(0) # switches task every one iteration.
async def main():
await asyncio.gather(
coro1(),
coro2(),
)
asyncio.run(main())
## Or instead of defining the main async function:
futures = [coro1(), coro2()]
await asyncio.gather(*futures)
在Python 3.11中添加了新的asyncio.TaskGroup
,您不需要.gather()
:
async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(coro1())
tg.create_task(coro2())
asyncio.run(main())
否则,如果您正在使用Python 3.6或3.5,请执行以下操作以获得相同的结果,您也应该处理循环:
import asyncio
async def coro1():
for i in range(1, 6):
print(i)
await asyncio.sleep(0) # switches task every one iteration.
async def coro2():
for i in range(1, 6):
print(i * 10)
await asyncio.sleep(0) # switches task every one iteration.
loop = asyncio.get_event_loop()
futures = [
asyncio.ensure_future(coro1()),
asyncio.ensure_future(coro2())
]
loop.run_until_complete(asyncio.gather(*futures))
loop.close()
:
1
10
2
20
3
30
4
40
5
50