SQLAlchemy—将一个类映射到多个表,其中一个表是只读的



我有一个foobar概念,它是由'foobar'和'foobar_data_cache'表表示的数据库和'foobar'类是python。'foobar'表表示有关概念的一些数据,'foobar_data_cache'表表示有关概念的一些其他数据,这些数据来自数据库中的许多其他信息,并由数据库触发器计算。为了保持一致性,从'foobar_data_cache'表中撤销INSERT, UPDATE和DELETE权限。

使用SQLAlchemy,我想将'Foobar'类映射到两个表'Foobar'和'foobar_data_cache'与连接。没有理由使用另一个类来表示来自'foobar_data_cache'表的数据并在这两个类之间建立关系,因为来自两个表的数据是强相关的。实际上,从数据库的角度来看,这两个表之间存在一对一的关系,由以下语句保证:
- 'foobar_data_cache'的主键也是引用'foobar'的主键的外键
-以及一个触发器,检查'foobar'中的每一行在'foobar_data_cache'中是否有相应的行

我的问题是,当我尝试用SQLAlchemy ORM持久化一个新的Foobar对象时,它试图为'foobar_data_cache'表插入一些行,这是我想要防止的。

那么,是否可以将SQLAlchemy配置为将'foobar_data_cache'表视为只读的方式?如果是,怎么做?

下面是解释我的问题的代码:

from sqlalchemy import (
    Table,
    Column,
    Integer,
    String,
    ForeignKeyConstraint,
    join,
    create_engine,
    )
from sqlalchemy.orm import (
    column_property,
    sessionmaker,
    )
from sqlalchemy.schema import (
    FetchedValue,
    )
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
_foobar_table = Table('foobar', Base.metadata,
        Column('id', Integer, primary_key=True),
        Column('some_data', String),
    )
_foobar_data_cache_table = Table('foobar_data_cache', Base.metadata,
        Column('foobar_id', Integer, primary_key=True),
        Column('computed_data', String, server_default=FetchedValue()),
        ForeignKeyConstraint(['foobar_id'], ['foobar.id']),
    )
class Foobar(Base):
    __table__ = _foobar_table.join(_foobar_data_cache_table)
    _id = column_property(_foobar_table.c.id, _foobar_data_cache_table.c.foobar_id)
    def __init__(self, some_data):
        self.some_data = some_data
if __name__ == '__main__':
    engine = create_engine('postgresql://tester@localhost:5432/mytestdb')
    Session = sessionmaker(bind=engine)
    session = Session()
    my_foobar = Foobar('Dummy data')
    session.add(my_foobar)
    session.commit()
下面是创建两个表的SQL命令:
CREATE TABLE foobar (
  id int NOT NULL DEFAULT -2147483648,
  some_data varchar NOT NULL,
  CONSTRAINT pk_foobar PRIMARY KEY (id)
);
CREATE TABLE foobar_data_cache (
  foobar_id int NOT NULL,
  computed_data varchar NOT NULL,
  CONSTRAINT pk_foobar_data_cache PRIMARY KEY (foobar_id),
  CONSTRAINT fk_foobar_data_cache_foobar_1 FOREIGN KEY (foobar_id)
    REFERENCES foobar (id) MATCH FULL
    ON UPDATE CASCADE ON DELETE CASCADE
);  

注意:
有些人可能想知道,考虑到它们之间是一对一的关系,为什么我要将数据拆分到两个不同的表中。通过在计算列上使用单个表和FetchedValue构造(参见如何在SQLAlchemy上仅保存映射列的子集?),可以很容易地解决这个问题。嗯,这有点复杂,但我会尽力解释的。首先,上面没有解释的其他事情是:
-我使用的PostgreSQL 8.4,其中,不能有可延迟的UNIQUE约束
-我的列都不接受NULL值
一些列在'foobar_data_cache'有一个唯一的约束(不可延迟)
-在'foobar_data_cache'中计算数据的触发器延迟到事务结束。事实上,这是因为它从其他表中获取信息,这些信息只能在'foobar'上插入之后插入,由于外键约束。

也就是说,在使用单个表的情况下,由于NOT-NULL约束,这意味着我必须为计算的列使用临时虚拟值。我的触发器最终会在事务结束时覆盖它。问题在于并发性。实际上,当另一个事务T1正在执行时,新事务Tx试图插入新的'foobar',将会失败,因为唯一列的默认虚拟值已经存在于执行事务T1对应的行中。我可以为具有唯一约束的列生成随机虚拟值,但我不喜欢这种方式。

如果没有可用的表主键,ORM将跳过任何INSERT/UPDATE/DELETE操作。在这种情况下,exclude_properties将实现这一点,同时注意组合"PK"列消失了,因为我们不关心"foobar_id"的值:

from sqlalchemy import (
    Table,
    Column,
    Integer,
    String,
    ForeignKeyConstraint,
    join,
    create_engine,
    )
from sqlalchemy.orm import (
    column_property,
    sessionmaker,
    )
from sqlalchemy.schema import (
    FetchedValue,
    )
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
_foobar_table = Table('foobar', Base.metadata,
        Column('id', Integer, primary_key=True),
        Column('some_data', String),
    )
_foobar_data_cache_table = Table('foobar_data_cache', Base.metadata,
        Column('foobar_id', Integer, primary_key=True),
        Column('computed_data', String, server_default=FetchedValue()),
        ForeignKeyConstraint(['foobar_id'], ['foobar.id']),
    )
class Foobar(Base):
    __table__ = _foobar_table.join(_foobar_data_cache_table)
    _id = _foobar_table.c.id
    def __init__(self, some_data):
        self.some_data = some_data
    __mapper_args__ = {"exclude_properties": [_foobar_data_cache_table.c.foobar_id]}
if __name__ == '__main__':
    engine = create_engine('postgresql://scott:tiger@localhost/test', echo=True)
    Base.metadata.drop_all(engine)
    Base.metadata.create_all(engine)
    Session = sessionmaker(bind=engine)
    session = Session()
    my_foobar = Foobar('Dummy data')
    session.add(my_foobar)
    # simulate your trigger...
    session.flush()
    assert session.scalar(_foobar_data_cache_table.count()) == 0
    session.execute(
        _foobar_data_cache_table.insert(),
        params=dict(
                foobar_id=my_foobar._id,
                computed_data="some computed data"
                )
    )
    session.commit()
    obj = session.query(Foobar).first()
    assert obj.computed_data == "some computed data"

也就是说,如果您使用传统的relationship()作为_foobar_foobar_data_cache_table之间的链接,而不是组合成映射的join(),那么整个映射将方式更简单。

相关内容

  • 没有找到相关文章

最新更新