如何避免使用循环函数阻塞异步事件循环



我正在使用FastAPI和WebSockets来";"推";SVG发送给客户问题是:如果迭代持续运行,它们会阻塞异步事件循环,因此套接字无法侦听其他消息。

将循环作为后台任务运行是不合适的,因为每次迭代都占用大量CPU,并且数据必须返回到客户端。

是否有不同的方法,或者我需要从客户端触发每个步骤?我认为多处理可以工作,但不确定如何使用像await websocket.send_text()这样的异步代码。

@app.websocket("/ws")
async def read_websocket(websocket: WebSocket) -> None:
await websocket.accept()
while True:
data = await websocket.receive_text()
async def run_continuous_iterations():
#needed to run the steps until the user sends "stop"
while True:
svg_string = get_step_data()
await websocket.send_text(svg_string) 
if data == "status":
await run_continuous_iterations()
#this code can't run if the event loop is blocked by run_continuous_iterations
if data == "stop":
is_running = False
print("Stopping process")

"。。。每次迭代的CPU都很重,数据必须返回到客户";。

如在这个答案中所描述的;协程只有当它明确地请求被挂起时才挂起它的执行";,例如,如果存在对异步操作/函数的await调用;正常地到非阻塞I/O-bound任务,例如这里描述的任务(注意:FastAPI/Starlette运行I/O-bound方法,例如使用asyncrun_in_threadpool()函数读取外部线程池中的File内容,然后使用await;因此,从async def端点(例如await file.read()(调用此类File操作不会阻止事件循环——请查看上面的链接答案以了解更多详细信息(。然而,这不适用于阻塞I/O-boundCPU-bound操作,例如这里提到的操作。在async def端点内运行此类操作将阻塞事件循环;因此,任何进一步的客户端请求都将被阻止,直到阻止操作完成。

此外,从您提供的代码片段来看,您似乎希望将数据发送回客户端,同时侦听新消息(以检查客户端是否发送了"停止"消息,从而停止进程(。因此,await等待一个操作完成是而不是的方法,而是在一个单独的线程或进程中执行该任务(如果这是一个CPU绑定的任务(——如这个答案所示,但如果没有await,这应该是一种更合适的方式(注意:进程有自己的内存,因此,在多个进程之间共享websocket连接在本机上是不可行的——请查看此处和此处以获取可用选项(。下面给出了使用单独线程的解决方案。

使用asyncioloop.run_in_executor()ThreadPoolExecutor

传递None作为executor参数loop.run_in_executor(),将使用默认的executor;即CCD_ 22。

import asyncio
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
is_running = True
await websocket.accept()

try:
while True:
data = await websocket.receive_text()
async def run_continuous_iterations():
while is_running:
svg_string = get_step_data()
await websocket.send_text(svg_string)

if data == "status":
is_running = True
loop = asyncio.get_running_loop()
loop.run_in_executor(None, lambda: asyncio.run(run_continuous_iterations()))
if data == "stop":
is_running = False
print("Stopping process")

except WebSocketDisconnect:
is_running = False
print("Client disconnected")  

使用asyncioloop.run_in_executor()和自定义ThreadPoolExecutor:

import concurrent.futures
#...  rest of the code is the same as above
@app.on_event("startup")
def startup_event():
# instantiate the ThreadPool
app.state.pool = concurrent.futures.ThreadPoolExecutor()
@app.on_event("shutdown")
def shutdown_event():  
# terminate the ThreadPool
app.state.pool.shutdown()

#...  rest of the code is the same as above

if data == "status":
is_running = True
loop = asyncio.get_running_loop()
loop.run_in_executor(app.state.pool, lambda: asyncio.run(run_continuous_iterations()))
#...  rest of the code is the same as above

使用threadingThread:

#...  rest of the code is the same as above

if data == "status":
is_running = True
thread = threading.Thread(target=lambda: asyncio.run(run_continuous_iterations()))
thread.start()
#...  rest of the code is the same as above

最新更新