给定像这样的SQLAlchemy ORM模型
class Foo(Base):
__tablename__ = 'foo'
id = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.String())
在SQLAlchemy 1.4/2.0中,ORM的session.query
习语与SQLAlchemy核心select
函数*统一,因此要获取所有Foo
,我们将要做
foos = session.execute(Foo).scalars().all()
而不是
foos = session.query(Foo).all()
在当前 (sqlalchemy<=1.3) ORM 中,我们可以使用以下查询获取数据库中的Foo
秒数:
nfoos = session.query(Foo).count()
但是我们如何在 SQLALchemy 1.4 中获取计数呢?
session.execute(sa.select(Foo).count())
提高
属性错误:"选择"对象没有属性"计数">
session.execute(sa.select(Foo)).count()
提高
属性错误:"ChunkedIteratorResult"对象没有属性"计数">
session.execute(sa.select(sa.func.count(Foo)))
提高
sqlalchemy.exc.ArgumentError: SQL 表达式元素预期,得到
<class '__main__.Foo'>
.
这行得通,
session.execute(sa.select(sa.func.count(Foo.id))).scalars()
但是指定属性似乎不如Query.count
版本OO/优雅。 此外,它排除了构建查询,但推迟了是检索计数还是模型实例的决定。
在新的SQLAlchemy 1.4世界中获取ORM查询count()
的惯用方法是什么?
*session.query()
API 在 1.4 及更高版本中仍然可以工作
根据 Ilja Everilä 的评论,在 SQLAlchemy 1.4 中发布的新 ORM 查询 API 中似乎没有直接等同于 Query.count(在撰写本文时处于测试阶段)。
功能等价物是调用count()
,从子查询中选择*
from sqlalchemy import func, select
count = (
session.execute(select(func.count()).select_from(select(Foo).subquery()))
.scalar_one()
)
生成此 SQL
SELECT count(*) AS count_1
FROM (SELECT foo.id AS id, foo.name AS name
FROM foo)
AS anon_1
对属性(如主键列)进行计数会生成一个简单的SELECT COUNT
count = session.execute(select(func.count(Foo.id))).scalar_one()
SELECT count(foo.id) AS count_1
FROM foo
Query.count
在 1.4 中仍然受支持,因此它仍然可以使用,1.3 中提供的所有 ORM 功能也可以使用。
*Query.count
也从子查询中进行选择,而不是直接执行SELECT COUNT
。
如果您已经构造了一个包含连接和多个条件的复杂query
,则可以像这样计算查询结果:
count_query = query.with_only_columns(func.count(Foo.id))
count = session.execute(count_query).scalar_one()
这个想法是使用查询的副本并将所选列替换为单个计数列。
您可以使用行计数
# using sqlalchemy
from sqlalchemy import create_engine
def connect_to_db():
"""Create database connection."""
url = f"mysql+pymysql://{user}:{db_pwd}@{host}:{port}/{db_name}"
try:
engine = create_engine(url)
return engine.connect()
except Exception as e:
raise e
def execute_query(query):
connection = connect_to_db()
with connection as con:
result = con.execute(query)
return result
results = execute_query("show tables;")
print("count = ",results.rowcount)
output: count = 1298
对我来说 - 使用Flask-Sqlalchemy,使用SQLAlchemy 2.X版本 - 这有效:
subselect = db.select(Tablename).filter_by(user_id=current_user.id, ...)
select = db.select(db.func.count()).select_from(subselect)
number_of_rows = db.session.execute(query).scalar_one()
向@snakecharmerb大喊大叫 - 罢工
为他/她自己的问题提供答案,并真正帮助我!
我正在使用Flask,SQLAlchemy的session
相当于Flask-SQLAlchemy中的db.session
(如果我错了,请纠正我),以下是简单计数的示例:
遗产:
>>> db.session.query(Order).count()
2111
2.0 语法(表中的简单行数):
>>> from sqlalchemy import func
>>> db.session.scalar(db.select(func.count(Order.order_id)))
2111
我假设对任何列进行计数都是安全的,并且计数不会消除重复项(我保证,我没有 2000+ 种货币):
>>> db.session.scalar(db.select(func.count(Order.currency)))
2111
附言通过选择语句print(db.select(func.count(Order.currency)))
生成的 SQL:
SELECT count("order".currency) AS count_1
FROM "order"
2.0 语法(使用子查询):
正如其他人所建议的那样,从子查询计数更加复杂。
>>> to_filter = db.select(Order).where(Order.currency=='GBP')
>>> db.session.scalar(db.select(func.count()).select_from(to_filter))
17