如何在WebSocket上返回数据,当新的数据库条目FastAPI



我正在尝试编写一个简单的API来收集测量值,然后通过FastAPI的websocket将它们实时流式传输给客户端。有很多关于如何在websocket管理器触发时发送消息的教程,但是我很难让数据库触发器发送消息。下面是我到目前为止写的:

main.py:

from fastapi import FastAPI, Depends, WebSocket, WebSocketDisconnect
from sql_app.database import engine, Session
from sql_app import models
from fastapi.encoders import jsonable_encoder
import sql_app.schemas as schemas
from sql_app.database import Base, get_db
import datetime
import uvicorn

def create_tables():
print("Creating Tables..")
Base.metadata.create_all(bind=engine)
app = FastAPI()

@app.post("/measurement/")
async def create_measurement(measurement: schemas.MeasurementCreate, db: Session = Depends(get_db)):
# new_measurement = schemas.MeasurementCreate(**measurement.dict(), session=db)
new_measurement = models.Measurement(**measurement.dict())
db.add(new_measurement)
db.commit()
db.refresh(new_measurement)
return new_measurement

@app.post("/create_device/")
async def create_device(device: schemas.DeviceCreate, db: Session = Depends(get_db)):
new_device = models.Device(device_key=device.device_key,
name=device.name,
hardware=device.hardware,
firmware=device.firmware,
software=device.software
)
db.add(new_device)
db.commit()
return db.refresh(new_device)
@app.get("/measurement/")
async def get_measurements(db: Session = Depends(get_db)):
return db.query(models.Measurement).filter(
models.Measurement.timestamp >= datetime.datetime.now() - datetime.timedelta(days=30)
).all()


@app.websocket("/ws")
async def dashboard_data(websocket: WebSocket, db: Session = Depends(get_db)):
await websocket.accept()
await websocket.send_json(
jsonable_encoder(
db.query(models.Measurement).filter(
models.Measurement.timestamp >= datetime.datetime.now() - datetime.timedelta(days=30)).all()))
while True:
try:
data = await models.measurement_stream(Depends(get_db))
await websocket.send_text(data)
except WebSocketDisconnect:
return None
if __name__ == "__main__":
uvicorn.run(app, host="localhost", port=8000)

Models.py:

from sqlalchemy import Boolean, Column, ForeignKey, Integer, String, Float, DateTime, event
from sqlalchemy.schema import UniqueConstraint
from sqlalchemy.orm import relationship
from fastapi import Depends
from .database import Base, get_db
import datetime
from pytz import timezone
from sql_app.database import Session

class Measurement(Base):
__tablename__ = "measurements"
id = Column(Integer, primary_key=True, index=True)
device_key = Column(String(length=40), ForeignKey("devices.device_key"))
inside_temp = Column(Float)
outside_temp = Column(Float)
inside_humidity = Column(Float)
outside_humidity = Column(Float)
current_capacity = Column(Float)
timestamp = Column(DateTime, default=lambda: datetime.datetime.now(tz=timezone('America/Los_Angeles')))
device = relationship("Device", back_populates="measurements")
def _as_dict(self):
return {c.name: getattr(self, c.name) for c in self.__table__.columns}

@event.listens_for(Measurement, "after_insert")
async def measurement_stream(db: Session):
return "test" #db.query(Measurement).filter(Measurement.timestamp >= datetime.datetime.now() - datetime.timedelta(days=30)).all()

class Device(Base):
__tablename__ = "devices"
device_key = Column(String(length=40), unique=True, primary_key=True)
name = Column(String)
hardware = Column(String)
firmware = Column(String)
software = Column(String)
measurements = relationship("Measurement", back_populates="device")

如果我现在执行它它只返回"test"和循环运行一样快。如何使消息仅在对数据库进行更新时发送?注意:我甚至不确定度量ORM类的事件侦听器是否正确。

我想我找到了答案,尽管我不确定我在这里做的事情可能会带来什么其他后果。但我的工作似乎与多个客户。

@app.websocket("/ws")
async def dashboard_data(websocket: WebSocket, db: Session = Depends(get_db)):
flag = asyncio.Event()
@event.listens_for(models.Measurement, "after_insert")
def measurement_stream(*args, **kwargs):
flag.set()
print("event set")
await websocket.accept()
await websocket.send_json(
jsonable_encoder(
db.query(models.Measurement).filter(
models.Measurement.timestamp >= datetime.datetime.now() - datetime.timedelta(days=30)).all()))
while True:
try:
await flag.wait()
await websocket.send_json(jsonable_encoder(
db.query(models.Measurement).filter(
models.Measurement.timestamp >= datetime.datetime.now() - datetime.timedelta(days=30)).all()))
flag.clear()
except WebSocketDisconnect:
return None

基本上我在这里所做的是把事件监听器放在websocket路由中,当一个新的测量值被输入到数据库中时,事件被触发并允许循环继续一个迭代,直到标志再次被清除,它必须等待事件监听器再次触发以设置标志。就像我说的,这可能会有其他性能或稳定性的影响,所以如果有人有更好的解决方案,请告诉我。

最新更新