我很难理解如何使用FastAPI和SQLModel在一对多关系中显示子数据。我使用Python 3.10.3, FastAPI版本0.78.0和SQLModel版本0.0.6。下面是父/子数据库模型的简化版本:
from datetime import datetime
from email.policy import default
from sqlalchemy import UniqueConstraint
from sqlmodel import Field, SQLModel, Relationship
class CustomerBase(SQLModel):
__table_args__ = (UniqueConstraint("email"),)
first_name: str
last_name: str
email: str
active: bool | None = True
class Customer(CustomerBase, table=True):
id: int | None =Field(primary_key=True, default=None)
class CustomerCreate(CustomerBase):
pass
class CustomerRead(CustomerBase):
id: int
class CustomerReadWithCalls(CustomerRead):
calls: list["CallRead"] = []
class CallBase(SQLModel):
duration: int
cost_per_minute: int | None = None
customer_id: int | None = Field(default=None, foreign_key="customer.id")
created: datetime = Field(nullable=False, default=datetime.now().date())
class Call(CallBase, table=True):
id: int | None = Field(primary_key=True)
class CallCreate(CallBase):
pass
class CallRead(CallBase):
id: int
class CallReadWithCustomer(CallRead):
customer: CustomerRead | None
下面是API路由:
from fastapi import APIRouter, HTTPException, Depends, Query
from rbi_app.crud.customer import (
get_customers,
get_customer,
)
from rbi_app.models import (
CustomerRead,
CustomerReadWithCalls,
)
from rbi_app.database import Session, get_session
router = APIRouter()
@router.get("/customers/", status_code=200, response_model=list[CustomerRead])
def read_customers(
email: str = "",
offset: int = 0,
limit: int = Query(default=100, lte=100),
db: Session = Depends(get_session)
):
return get_customers(db, email, offset=offset, limit=limit)
@router.get("/customers/{customer_id}", status_code=200, response_model=CustomerReadWithCalls)
def read_customer(id: int, db: Session = Depends(get_session)):
customer = get_customer(db, id)
if customer is None:
raise HTTPException(status_code=404, detail=f"Customer not found for {id=}")
return customer
下面是API路由端点对数据库的查询:
from sqlmodel import select
from rbi_app.database import Session
from rbi_app.models import (
Customer,
CustomerCreate,
)
# from rbi_app.schemas.customer import CustomerCreate
def get_customer(db: Session, id: int):
return db.get(Customer, id)
def get_customers(db: Session, email: str = "", offset: int = 0, limit: int = 100):
if email:
return db.exec(select(Customer).where(Customer.email == email)).first()
return db.exec(select(Customer).offset(offset).limit(limit).order_by(Customer.id)).all()
当我导航路线得到所有客户我和查询运行得到一个客户,但是没有"calls"列表属性中的客户。OpenAPI显示一个"calls"属性,但它是空的。
我做错了什么?
这里的问题似乎是您没有在Customer
模型(或Call
模块)上定义关系。由于您使用Customer
模型查询数据库,并且它没有calls
属性,因此get_customer
函数返回的对象中不存在该数据。
即使你的路线定义CustomerReadWithCalls
响应模型,称之为,类只能被实例化的对象和你的路线返回的数据处理函数,这是您的Customer
实例。因为它甚至没有calls
属性(更不用说数据了),所以CustomerReadWithCalls
对象实际上是用您为calls
字段定义的默认值创建的——空列表。
添加
calls: list["Call"] = Relationship(back_populates="customer")
到你的Customer
模型应该不够。
(但作为旁注,对我来说,路由文档只有在CallRead
定义后显式更新CustomerReadWithCalls
模型上的引用时才能正常工作)
下面是一个完整的工作示例。
models.py
from datetime import datetime
from sqlalchemy import UniqueConstraint
from sqlmodel import Field, Relationship, SQLModel
class CustomerBase(SQLModel):
__table_args__ = (UniqueConstraint("email"),)
first_name: str
last_name: str
email: str
active: bool | None = True
class Customer(CustomerBase, table=True):
id: int | None = Field(primary_key=True, default=None)
calls: list["Call"] = Relationship(back_populates="customer")
class CustomerCreate(CustomerBase):
pass
class CustomerRead(CustomerBase):
id: int
class CustomerReadWithCalls(CustomerRead):
calls: list["CallRead"] = []
class CallBase(SQLModel):
duration: int
cost_per_minute: int | None = None
customer_id: int | None = Field(default=None, foreign_key="customer.id")
created: datetime = Field(nullable=False, default=datetime.now().date())
class Call(CallBase, table=True):
id: int | None = Field(primary_key=True, default=None)
customer: Customer | None = Relationship(back_populates="calls")
class CallCreate(CallBase):
pass
class CallRead(CallBase):
id: int
# After the definition of `CallRead`, update the forward reference to it:
CustomerReadWithCalls.update_forward_refs()
class CallReadWithCustomer(CallRead):
customer: CustomerRead | None
routes.py
from fastapi import FastAPI, HTTPException, Depends
from sqlmodel import Session, SQLModel, create_engine
from .models import CustomerReadWithCalls, Customer, Call
api = FastAPI()
sqlite_file_name = 'database.db'
sqlite_url = f'sqlite:///{sqlite_file_name}'
engine = create_engine(sqlite_url, echo=True)
@api.on_event('startup')
def initialize_db():
SQLModel.metadata.drop_all(engine)
SQLModel.metadata.create_all(engine)
# For testing:
with Session(engine) as session:
customer = Customer(first_name="Foo", last_name="Bar", email="foo@bar.com")
call1 = Call(duration=123)
call2 = Call(duration=456)
customer.calls.extend([call1, call2])
session.add(customer)
session.commit()
def get_session() -> Session:
session = Session(engine)
try:
yield session
finally:
session.close()
def get_customer(db: Session, id: int):
return db.get(Customer, id)
@api.get("/customers/{customer_id}", status_code=200, response_model=CustomerReadWithCalls)
def read_customer(customer_id: int, db: Session = Depends(get_session)):
customer = get_customer(db, customer_id)
if customer is None:
raise HTTPException(status_code=404, detail=f"Customer not found for {customer_id=}")
return customer
启动API服务器并将GET
发送到http://127.0.0.1:8000/customers/1
给我
{
"first_name": "Foo",
"last_name": "Bar",
"email": "foo@bar.com",
"active": true,
"id": 1,
"calls": [
{
"duration": 123,
"cost_per_minute": null,
"customer_id": 1,
"created": "2022-08-16T00:00:00",
"id": 1
},
{
"duration": 456,
"cost_per_minute": null,
"customer_id": 1,
"created": "2022-08-16T00:00:00",
"id": 2
}
]
}
希望对你有帮助。