我正在使用FastAPI和WebSockets来";"推";SVG发送给客户问题是:如果迭代持续运行,它们会阻塞异步事件循环,因此套接字无法侦听其他消息。
将循环作为后台任务运行是不合适的,因为每次迭代都占用大量CPU,并且数据必须返回到客户端。
是否有不同的方法,或者我需要从客户端触发每个步骤?我认为多处理可以工作,但不确定如何使用像await websocket.send_text()
这样的异步代码。
@app.websocket("/ws")
async def read_websocket(websocket: WebSocket) -> None:
await websocket.accept()
while True:
data = await websocket.receive_text()
async def run_continuous_iterations():
#needed to run the steps until the user sends "stop"
while True:
svg_string = get_step_data()
await websocket.send_text(svg_string)
if data == "status":
await run_continuous_iterations()
#this code can't run if the event loop is blocked by run_continuous_iterations
if data == "stop":
is_running = False
print("Stopping process")
"。。。每次迭代的CPU都很重,数据必须返回到客户";。
如在这个答案中所描述的;协程只有当它明确地请求被挂起时才挂起它的执行";,例如,如果存在对异步操作/函数的await
调用;正常地到非阻塞I/O-bound
任务,例如这里描述的任务(注意:FastAPI/Starlette运行I/O-bound
方法,例如使用async
run_in_threadpool()
函数读取外部线程池中的File
内容,然后使用await
;因此,从async def
端点(例如await file.read()
(调用此类File
操作不会阻止事件循环——请查看上面的链接答案以了解更多详细信息(。然而,这不适用于阻塞I/O-bound
或CPU-bound
操作,例如这里提到的操作。在async def
端点内运行此类操作将阻塞事件循环;因此,任何进一步的客户端请求都将被阻止,直到阻止操作完成。
此外,从您提供的代码片段来看,您似乎希望将数据发送回客户端,同时侦听新消息(以检查客户端是否发送了"停止"消息,从而停止进程(。因此,await
等待一个操作完成是而不是的方法,而是在一个单独的线程或进程中执行该任务(如果这是一个CPU绑定的任务(——如这个答案所示,但如果没有await
,这应该是一种更合适的方式(注意:进程有自己的内存,因此,在多个进程之间共享websocket连接在本机上是不可行的——请查看此处和此处以获取可用选项(。下面给出了使用单独线程的解决方案。
使用asyncio
的loop.run_in_executor()
和ThreadPoolExecutor
传递None
作为executor参数loop.run_in_executor()
,将使用默认的executor;即CCD_ 22。
import asyncio
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
is_running = True
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
async def run_continuous_iterations():
while is_running:
svg_string = get_step_data()
await websocket.send_text(svg_string)
if data == "status":
is_running = True
loop = asyncio.get_running_loop()
loop.run_in_executor(None, lambda: asyncio.run(run_continuous_iterations()))
if data == "stop":
is_running = False
print("Stopping process")
except WebSocketDisconnect:
is_running = False
print("Client disconnected")
使用asyncio
的loop.run_in_executor()
和自定义ThreadPoolExecutor
:
import concurrent.futures
#... rest of the code is the same as above
@app.on_event("startup")
def startup_event():
# instantiate the ThreadPool
app.state.pool = concurrent.futures.ThreadPoolExecutor()
@app.on_event("shutdown")
def shutdown_event():
# terminate the ThreadPool
app.state.pool.shutdown()
#... rest of the code is the same as above
if data == "status":
is_running = True
loop = asyncio.get_running_loop()
loop.run_in_executor(app.state.pool, lambda: asyncio.run(run_continuous_iterations()))
#... rest of the code is the same as above
使用threading
的Thread
:
#... rest of the code is the same as above
if data == "status":
is_running = True
thread = threading.Thread(target=lambda: asyncio.run(run_continuous_iterations()))
thread.start()
#... rest of the code is the same as above