最小可复制示例:
import asyncio
import aiopg
from fastapi import FastAPI, WebSocket
dsn = "dbname=aiopg user=aiopg password=passwd host=127.0.0.1"
app = FastAPI()
class ConnectionManager:
self.count_connections = 0
# other class functions and variables are taken from FastAPI docs
...
manager = ConnectionManager()
async def send_and_receive_data(websocket: WebSocket):
data = await websocket.receive_json()
await websocket.send_text('Thanks for the message')
# then process received data
# taken from official aiopg documentation
# the function listens to PostgreSQL notifications
async def listen(conn):
async with conn.cursor() as cur:
await cur.execute("LISTEN channel")
while True:
msg = await conn.notifies.get()
async def postgres_listen():
async with aiopg.connect(dsn) as listenConn:
listener = listen(listenConn)
await listener
@app.get("/")
def read_root():
return {"Hello": "World"}
@app.websocket("/")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
manager.count_connections += 1
if manager.count_connections == 1:
await asyncio.gather(
send_and_receive_data(websocket),
postgres_listen()
)
else:
await send_and_receive_data(websocket)
问题描述:
我正在用Vue.js, FastAPI和PostgreSQL构建一个应用程序。在这个例子中,我尝试从Postgres中使用listen/notify并在websocket中实现它。我还使用了很多常用的http端点和websocket端点。
我想在FastAPI应用开始时运行一个永久的后台异步函数,然后向所有websocket客户端/连接发送消息。因此,当我使用uvicorn main:app
时,它不仅应该运行FastAPI应用程序,而且还应该运行我的后台函数postgres_listen()
,它通知所有websocket用户,当一个新行添加到数据库中的表中。
我知道我可以使用asyncio.create_task()
并将其放置在on_*
事件中,甚至将其放置在manager = ConnectionManager()
行之后,但它在我的情况下不起作用!因为在任何http请求(例如,read_root()
函数)之后,我将得到下面描述的相同错误。
你看到,我使用一种奇怪的方式来运行我的postgres_listen()
函数在我的websocket_endpoint()
函数只有当第一个客户端连接到websocket。任何后续客户端连接都不会再次运行/触发此函数。一切都很好……直到第一个客户端/用户断开连接(例如,关闭浏览器选项卡)。当它发生时,我立即得到psycopg2.OperationalError
引起的GeneratorExit
错误:
Future exception was never retrieved
future: <Future finished exception=OperationalError('Connection closed')>
psycopg2.OperationalError: Connection closed
Task was destroyed but it is pending!
task: <Task pending name='Task-18' coro=<Queue.get() done, defined at
/home/user/anaconda3/lib/python3.8/asyncio/queues.py:154> wait_for=<Future cancelled>>
错误来自listen()
函数。在此错误之后,由于asyncio的Task
被取消,我将不会从数据库获得任何通知。psycopg2
,aiopg
或asyncio
没有问题。问题是我不明白在哪里放置postgres_listen()
功能,因此在第一个客户端断开连接后它不会被取消。从我的理解,我可以很容易地写一个python脚本,将连接到websocket(所以我将是websocket的第一个客户端),然后永远运行,所以我不会得到psycopg2.OperationalError
异常再次,但它似乎不正确这样做。
我的问题是:我应该把postgres_listen()
函数放在哪里,所以第一次连接到websocket可能会断开而没有后果?
注:asyncio.shield()
也不工作
我也在Github上回答过这个问题,所以我在这里重新发布。
可以在这里找到一个工作示例:https://github.com/JarroVGIT/fastapi-github-issues/tree/master/5015
# app.py
import queue
from typing import Any
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from asyncio import Queue, Task
import asyncio
import uvicorn
import websockets
class Listener:
def __init__(self):
#Every incoming websocket conneciton adds it own Queue to this list called
#subscribers.
self.subscribers: list[Queue] = []
#This will hold a asyncio task which will receives messages and broadcasts them
#to all subscribers.
self.listener_task: Task
async def subscribe(self, q: Queue):
#Every incoming websocket connection must create a Queue and subscribe itself to
#this class instance
self.subscribers.append(q)
async def start_listening(self):
#Method that must be called on startup of application to start the listening
#process of external messages.
self.listener_task = asyncio.create_task(self._listener())
async def _listener(self) -> None:
#The method with the infinite listener. In this example, it listens to a websocket
#as it was the fastest way for me to mimic the 'infinite generator' in issue 5015
#but this can be anything. It is started (via start_listening()) on startup of app.
async with websockets.connect("ws://localhost:8001") as websocket:
async for message in websocket:
for q in self.subscribers:
#important here: every websocket connection has its own Queue added to
#the list of subscribers. Here, we actually broadcast incoming messages
#to all open websocket connections.
await q.put(message)
async def stop_listening(self):
#closing off the asyncio task when stopping the app. This method is called on
#app shutdown
if self.listener_task.done():
self.listener_task.result()
else:
self.listener_task.cancel()
async def receive_and_publish_message(self, msg: Any):
#this was a method that was called when someone would make a request
#to /add_item endpoint as part of earlier solution to see if the msg would be
#broadcasted to all open websocket connections (it does)
for q in self.subscribers:
try:
q.put_nowait(str(msg))
except Exception as e:
raise e
#Note: missing here is any disconnect logic (e.g. removing the queue from the list of subscribers
# when a websocket connection is ended or closed.)
global_listener = Listener()
app = FastAPI()
@app.on_event("startup")
async def startup_event():
await global_listener.start_listening()
return
@app.on_event("shutdown")
async def shutdown_event():
await global_listener.stop_listening()
return
@app.get('/add_item/{item}')
async def add_item(item: str):
#this was a test endpoint, to see if new items where actually broadcasted to all
#open websocket connections.
await global_listener.receive_and_publish_message(item)
return {"published_message:": item}
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
q: asyncio.Queue = asyncio.Queue()
await global_listener.subscribe(q=q)
try:
while True:
data = await q.get()
await websocket.send_text(data)
except WebSocketDisconnect:
return
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
由于我无法访问我可以订阅的消息流,所以我创建了一个生成websocket的快速脚本,以便上面的app.py可以(无限期地)侦听该消息来模拟您的用例。
# generator.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import asyncio
import uvicorn
app = FastAPI()
@app.websocket("/")
async def ws(websocket: WebSocket):
await websocket.accept()
i = 0
while True:
try:
await websocket.send_text(f"Hello - {i}")
await asyncio.sleep(2)
i+=1
except WebSocketDisconnect:
pass
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8001)
app.py将监听websocket,并将所有传入消息发布到app.py中所有与websocket连接的消息。
generator.py是一个简单的FastAPI应用程序,它有一个websocket(上面我们的示例app.py侦听的),它每2秒向它获得的每个连接发出一条消息。
试一下:
- 启动generator.py(例如python3 generator.py)
- 启动app.py(在VScode中调试模式或与上述相同)
- 用几个客户端听http://localhost:8000/ws (= endpoint in app.py),您将看到它们都将加入相同的消息条纹。
注意:这里的很多逻辑都是受broadcast(一个python模块)的启发