asyncio :: 访问在另一个循环中创建的 WebSocket



我正在尝试将某些函数转换为非异步函数,所以我在run_until_complete中运行它们; 但我想在 NOT 异步函数中创建一个 websocket,然后从多个函数中使用该 websocket 其他函数也不是异步的。尝试获取循环并运行另一个函数会给我一个异常,即已经有一个循环正在运行。 有没有办法做到这一点? 谢谢!

请注意,您正在尝试的内容与 asyncio 的设计背道而驰,其中的想法是整个程序(或至少与服务器通信的整个线程(都是异步的。但是,仍然可以将异步功能作为同步执行,您只需要使用一些技巧。

最好的办法是创建一个运行事件循环的专用线程,并使用asyncio.run_coroutine_threadsafe()将任务提交到事件循环。

您甚至可以将其抽象为装饰器,将任何异步方法转换为同步:

import threading, asyncio
def start_asyncio():
loop = None
ready = threading.Event()
async def wait_forever():
nonlocal loop
loop = asyncio.get_event_loop()
ready.set()
await loop.create_future()
threading.Thread(daemon=True, target=asyncio.run,
args=(wait_forever(),)).start()
ready.wait()
return loop
def syncify(fn):
def syncfn(*args, **kwds):
# submit the original coroutine to the event loop
# and wait for the result
conc_future = asyncio.run_coroutine_threadsafe(
fn(*args, **kwds), _loop)
return conc_future.result()
syncfn.as_async = fn
return syncfn

修饰器创建一个同步适配器,该适配器将基础异步函数提交到事件循环并等待结果。它可以用于像这样的网络套接字:

import websockets
_loop = start_asyncio()
@syncify
async def open_websocket(uri):
ws = await websockets.connect(uri)
return ws
@syncify
async def send_to_websocket(ws, msg):
await ws.send(msg)
@syncify
async def recv_from_websocket(ws):
return await ws.recv()
@syncify
async def start_echo_server(host, port):
async def _echo(ws, _path):
msg = await ws.recv()
await ws.send('echo ' + msg)
await websockets.serve(_echo, host, port)

所有这些函数都是用async def定义,但是由于装饰器,它们可以作为同步函数调用。这是一个测试:

def test():
start_echo_server("localhost", 8765)
ws = open_websocket("ws://localhost:8765")
send_to_websocket(ws, "hi there")
assert recv_from_websocket(ws) == "echo hi there"
if __name__ == '__main__':
test()
print('ok')

请注意,不允许从一个"sincified"函数调用另一个函数,因为它会阻塞事件循环。但这就是为什么装饰器提供了一个逃生舱口,即as_async属性,它允许您执行以下操作:

@syncify
async def talk(ws):
async with aiohttp.get(some_url) as resp:
data = await resp.data()
await send_to_websocket.as_async(ws, data)

这种方法的缺点是,在 Python 中,run_coroutine_threadsafe需要不可忽略的时间,尽管它只需要将作业提交到正在运行的事件循环中。上次我测量时,调用asyncio.run(x())实际上比asyncio.run_coroutine_threadsafe(x(), loop)更快,尽管前者设置并拆除了整个事件循环。不过,正如您所发现的,基于loop.run_until_completeasyncio.run的解决方案具有无法嵌套的限制。

最新更新