我使用SQLAlchemy定义了一个关联对象来表示两个表之间的多对多关系。我使用关联对象模式的原因是因为关联表包含额外的列。我对data_type
表中的name列有一个唯一的约束。当我尝试将数据插入source_key
并创建关系时,会导致以下错误。我的问题是,如果ID存在,如何获取ID并添加到关联表中;否则,在data_type中创建记录,然后添加到关联表"?
the-librarian-backend-1 | sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "ix_data_type_name"
the-librarian-backend-1 | DETAIL: Key (name)=(str) already exists.
the-librarian-backend-1 |
the-librarian-backend-1 | [SQL: INSERT INTO data_type (name) VALUES (%(name)s) RETURNING data_type.id]
the-librarian-backend-1 | [parameters: ({'name': 'str'}, {'name': 'str'}, {'name': 'str'}, {'name': 'str'}, {'name': 'str'}, {'name': 'date'}, {'name': 'list'}, {'name': 'int'} ... displaying 10 of 747 total bound parameter sets ... {'name': 'date'}, {'name': 'str'})]
<标题>模型# source_key.py
class SourceKey(Base):
__tablename__ = 'source_key'
id = Column(Integer, primary_key=True, index=True)
source_id = Column(Integer, ForeignKey('source.id'), nullable=False)
key_id = Column(Integer, ForeignKey('key.id'), nullable=False)
description = Column(Text)
data_types = relationship("SourceKeyDataType", back_populates="source_keys")
# data_type.py
class DataType(Base):
__tablename__ = 'data_type'
id = Column(Integer, primary_key=True, index=True)
name = Column(Text, index=True, nullable=False, unique=True)
source_keys = relationship("SourceKeyDataType", back_populates="data_types")
# Association Object
class SourceKeyDataType(Base):
__tablename__ = 'source_key_data_type_assoc'
source_key_id = Column(ForeignKey('source_key.id'), primary_key=True)
data_type_id = Column(ForeignKey('data_type.id'), primary_key=True)
count = Column(BigInteger)
source_keys = relationship("SourceKey", back_populates="data_types")
data_types = relationship("DataType", back_populates="source_keys")
<标题>代码source_keys = [
{
"key": {
"name": "total"
},
"description": "the total cost of all items",
"data_types": [
{
"name": "str",
"count": 1904165
}
]
},
{
"key": {
"name": "item_value"
},
"description": "the cost of a single item",
"data_types": [
{
"name": "str",
"count": 2079817
}
]
}
]
for source_key in source_keys:
source_key_obj = {k: v for k, v in item.items() if isinstance(v, (str, int, bool, float))}
source_key_db_obj = SourceKey(**source_key_obj)
for dt in source_key.get("data_types") or []:
a = SourceKeyDataType(is_inferred=item.get("is_inferred", False), count=item.get("count", 0))
a.data_types = models.DataType(name=item["name"])
source_key_db_obj.data_types.append(a)
db.add(source_key_db_obj)
db.commit()
db.refresh(source_key_db_obj)
标题>标题>标题>
我的问题是,我怎么说,"获取ID,如果它存在,并添加到关联表;否则,在data_type中创建记录,然后添加到关联表"?
你的代码需要做到这一点。让我们看一个使用关联表而不是关联对象的简化示例。设置测试:
from sqlalchemy import Column, create_engine, ForeignKey, Integer, select, String, Table
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import declarative_base, relationship, Session
engine = create_engine("sqlite://")
Base = declarative_base()
post_tag = Table(
"post_tag",
Base.metadata,
Column("post_id", Integer, ForeignKey("post.id"), primary_key=True),
Column("tag_id", Integer, ForeignKey("tag.id"), primary_key=True),
)
class Post(Base):
__tablename__ = "post"
id = Column(Integer, primary_key=True)
title = Column(String)
tags = relationship("Tag", secondary=post_tag)
class Tag(Base):
__tablename__ = "tag"
id = Column(Integer, primary_key=True)
name = Column(String, unique=True)
Base.metadata.create_all(engine)
# add test data into empty tables
with Session(engine) as sess:
sess.add(
Post(
title="getting unique constraint violation",
tags=[Tag(name="SQLAlchemy")],
)
)
sess.commit()
首先,让我们尝试用最简单的方式添加一个新的Post:
# 1st try: adding a new post and blindly creating a new Tag object
with Session(engine) as sess:
sess.add(
Post(
title="some other issue",
tags=[Tag(name="SQLAlchemy")],
)
)
try:
sess.commit()
except IntegrityError:
print("1st try: An IntegrityError has occurred")
# this error gets printed
现在让我们检查一下标签是否已经存在:
# 2nd try: check for existing tag first
with Session(engine) as sess:
sqla_tag = sess.scalars(
select(Tag).where(Tag.name == "SQLAlchemy")
).first()
if not sqla_tag:
# Tag object does not already exist, so create it
sqla_tag = Tag(name="SQLAlchemy")
sess.add(
Post(
title="some other issue",
tags=[sqla_tag],
)
)
sess.commit()
print("2nd try: Success.")
# no error this time
这是最直接的解决方案。一种更高级的技术是使用关联代理来自动化这个过程,但是一些用户发现它们很难使用。