如何将具有计数结果的列添加到sqlalchemy查询中并通过pydantic检查



描述

以前,我的查询返回了一个Stories表的内容。现在我想添加更多信息:我需要为每个故事输出prizes_countStories表中没有字段prizes_count,因此我进行了以下查询。

db.query(models.Stories, func.count(models.Stories.prizes).label("prizes_count")).join(models.Prizes)
.group_by(models.Stories.id).all()

但我有两个问题。

  1. 我从Pydantic得到了验证错误,因为这个查询返回了一个元组列表,比如(<database.models.Stories object at 0x0000026BB0055E20>, 1)。我必须将prizes_count值插入Stories对象,反之亦然,将所有字段拉入元组。当然,我可以手动完成,但我认为还有更好的方法
  2. 使用此查询,我会丢失所有0奖品的故事,因为我的加入会忽略它们

代码

终点

@app.get("/stories/", response_model=List[schemas.StoryFullInfo])
def get_stories(db: Session = Depends(get_db)):
return crud.get_stories(db)

积垢

def get_stories(db: Session):
return db.query(models.Stories, func.count(models.Stories.prizes).label("prizes_count")).join(models.Prizes)
.group_by(models.Stories.id).all()

型号

class Stories(Base):
__tablename__ = "stories"
id = Column(INTEGER(unsigned=True), primary_key=True)
title = Column(String(length=128), index=True)
text = Column(String(length=1000))
author_id = Column(INTEGER(unsigned=True), ForeignKey("users.id", onupdate="CASCADE", ondelete="CASCADE"),
nullable=False)
status = Column(TINYINT(unsigned=True), server_default="0")
genre_type = Column(TINYINT(unsigned=True), server_default="0")
likes_count = Column(INTEGER(unsigned=True), server_default="0")
image = Column(Text)
added_to_best_by = Column(INTEGER(unsigned=True))
creation_DT = Column(DateTime, server_default=func.now())
change_status_DT = Column(DateTime)
author = relationship("Users", back_populates="stories")
comments = relationship("Comments", back_populates="story")
prizes = relationship("Prizes", back_populates="story")

class Prizes(Base):
__tablename__ = "prizes"
id = Column(INTEGER(unsigned=True), primary_key=True)
title = Column(String(length=128), nullable=False)
image_id = Column(TINYINT(unsigned=True))
story_id = Column(INTEGER(unsigned=True), ForeignKey("stories.id", onupdate="CASCADE", ondelete="CASCADE"),
nullable=False)
user_id = Column(INTEGER(unsigned=True), ForeignKey("users.id", onupdate="CASCADE", ondelete="CASCADE"),
nullable=False)
text = Column(String(length=512), nullable=False)
creation_DT = Column(DateTime, server_default=func.now())
story = relationship("Stories", back_populates="prizes")
author = relationship("Users", back_populates="prizes")

架构

class StoryBaseInfo(BaseModel):
id: int
title: str = None
author_id: int
class Config:
orm_mode = True

class StoryUpdateInfo(StoryBaseInfo):
#title: str = None
text: str = None
status: int
genre_type: int
likes_count: int
image: str = None
added_to_best_by: int = None
change_status_DT: datetime = None
class Config:
orm_mode = True

class StoryFullInfo(StoryUpdateInfo):
creation_DT: datetime
author: UserBaseInfo
prizes_count: int
class Config:
orm_mode = True

class PrizeBaseInfo(BaseModel):
id: int
story_id: int
class Config:
orm_mode = True

class PrizeInfo(PrizeBaseInfo):
title: str
image_id: int
text: str
creation_DT: datetime
author: UserBaseInfo
story: StoryBaseInfo
class Config:
orm_mode = True

事实证明,当我问这个问题时,我想错了方向。SQLAlchemy的特性解决了这个问题。我可以使用我配置的关系来计算奖品。我的解决方案是将混合属性添加到Stories SQLAlchemy模型

class Stories(Base):
__tablename__ = "stories"
id = Column(INTEGER(unsigned=True), primary_key=True)
title = Column(String(length=128), index=True)
text = Column(String(length=1000))
author_id = Column(INTEGER(unsigned=True), ForeignKey("users.id", onupdate="CASCADE", ondelete="CASCADE"),
nullable=False)
status = Column(TINYINT(unsigned=True), server_default="0")
genre_type = Column(TINYINT(unsigned=True), server_default="0")
likes_count = Column(INTEGER(unsigned=True), server_default="0")
image = Column(Text)
added_to_best_by = Column(INTEGER(unsigned=True))
creation_DT = Column(DateTime, server_default=func.now())
change_status_DT = Column(DateTime)
author = relationship("Users", back_populates="stories")
comments = relationship("Comments", back_populates="story")
prizes = relationship("Prizes", back_populates="story")
@hybrid_property
def prizes_count(self):
return len(self.prizes)

然后下面的查询将满足Pydantic方案。

def get_stories(db: Session):
return db.query(models.Stories).all()