如何在SQLAlchemy中执行原始SQL?
我有一个python web应用程序,它在flask上运行,并通过SQLAlchemy连接到数据库。
我需要一种方法来运行原始 SQL。查询涉及多个表联接以及内联视图。
我试过:
connection = db.session.connection()
connection.execute( <sql here> )
但是我不断收到网关错误。
你试过吗:
result = db.engine.execute("<sql here>")
或:
from sqlalchemy import text
sql = text('select name from penguins')
result = db.engine.execute(sql)
names = [row[0] for row in result]
print names
请注意,db.engine.execute()
是"无连接"的,这在SQLAlchemy 2.0中已弃用。
SQL Alchemy 会话对象有自己的execute
方法:
result = db.session.execute('SELECT * FROM my_table WHERE my_column = :val', {'val': 5})
所有应用程序查询都应通过会话对象,无论它们是否是原始 SQL。这可确保事务正确管理查询,从而允许将同一请求中的多个查询作为单个单元提交或回滚。使用引擎或连接走出事务会使您面临更大的微妙风险,可能难以检测到的错误,这些错误可能会给您留下损坏的数据。每个请求应仅与一个事务相关联,使用 db.session
将确保应用程序就是这种情况。
另请注意,execute
是为参数化查询而设计的。对查询的任何输入使用参数(如示例中的参数:val
(,以保护自己免受 SQL 注入攻击。可以通过传递dict
作为第二个参数来提供这些参数的值,其中每个键都是查询中显示的参数的名称。参数本身的确切语法可能因数据库而异,但所有主要的关系数据库都以某种形式支持它们。
假设它是一个SELECT
查询,这将返回RowProxy
对象的可迭代对象。
您可以使用多种技术访问各个列:
for r in result:
print(r[0]) # Access by positional index
print(r['my_column']) # Access by column name as a string
r_dict = dict(r.items()) # convert to dict keyed by column names
就个人而言,我更喜欢将结果转换为namedtuple
:
from collections import namedtuple
Record = namedtuple('Record', result.keys())
records = [Record(*r) for r in result.fetchall()]
for r in records:
print(r.my_column)
print(r)
如果不使用 Flask-SQLAlchemy 扩展,您仍然可以轻松使用会话:
import sqlalchemy
from sqlalchemy.orm import sessionmaker, scoped_session
engine = sqlalchemy.create_engine('my connection string')
Session = scoped_session(sessionmaker(bind=engine))
s = Session()
result = s.execute('SELECT * FROM my_table WHERE my_column = :val', {'val': 5})
from_statement()
和text()
获取 SELECT SQL 查询的结果,如下所示。您不必以这种方式处理元组。作为具有表名的类User
的示例,您可以尝试users
,
from sqlalchemy.sql import text
user = session.query(User).from_statement(
text("""SELECT * FROM users where name=:name""")
).params(name="ed").all()
return user
文档:SQL 表达式语言教程 - 使用文本
例:
from sqlalchemy.sql import text
connection = engine.connect()
# recommended
cmd = 'select * from Employees where EmployeeGroup = :group'
employeeGroup = 'Staff'
employees = connection.execute(text(cmd), group = employeeGroup)
# or - wee more difficult to interpret the command
employeeGroup = 'Staff'
employees = connection.execute(
text('select * from Employees where EmployeeGroup = :group'),
group = employeeGroup)
# or - notice the requirement to quote 'Staff'
employees = connection.execute(
text("select * from Employees where EmployeeGroup = 'Staff'"))
for employee in employees: logger.debug(employee)
# output
(0, 'Tim', 'Gurra', 'Staff', '991-509-9284')
(1, 'Jim', 'Carey', 'Staff', '832-252-1910')
(2, 'Lee', 'Asher', 'Staff', '897-747-1564')
(3, 'Ben', 'Hayes', 'Staff', '584-255-2631')
对于 SQLAlchemy ≥ 1.4
从 SQLAlchemy 1.4 开始,不推荐使用无连接或隐式执行,即
db.engine.execute(...) # DEPRECATED
以及作为查询的裸字符串。
新的 API 需要显式连接,例如
from sqlalchemy import text
with db.engine.connect() as connection:
result = connection.execute(text("SELECT * FROM ..."))
for row in result:
# ...
同样,如果现有会话可用,建议使用现有会话:
result = session.execute(sqlalchemy.text("SELECT * FROM ..."))
或使用参数:
session.execute(sqlalchemy.text("SELECT * FROM a_table WHERE a_column = :val"),
{'val': 5})
有关更多详细信息,请参阅文档中的"无连接执行,隐式执行"。
result = db.engine.execute(text("<sql here>"))
执行<sql here>
但不提交它,除非你处于autocommit
模式。因此,插入和更新不会反映在数据库中。
要在更改后提交,请执行
result = db.engine.execute(text("<sql here>").execution_options(autocommit=True))
这是如何从 Flask Shell 运行 SQL 查询的简化答案
首先,映射模块(如果您的模块/应用程序 manage.py 在主体文件夹中,并且您使用的是 UNIX 操作系统(,运行:
export FLASK_APP=manage
运行烧瓶外壳
flask shell
导入我们需要的东西:
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy(app)
from sqlalchemy import text
运行查询:
result = db.engine.execute(text("<sql here>").execution_options(autocommit=True))
这使用具有应用程序的当前数据库连接。
3.0.x/SQLAlchemy v: 1.4
users = db.session.execute(db.select(User).order_by(User.title.desc()).limit(150)).scalars()
因此,基本上对于flask-sqlchemy的最新稳定版本,文档建议将session.execute()
方法与db.select(Object)
结合使用。
您是否尝试过按照文档中的说明使用connection.execute(text( <sql here> ), <bind params here> )
和绑定参数?这有助于解决许多参数格式和性能问题。也许网关错误是超时?绑定参数往往会使复杂查询的执行速度大大加快。
如果你想避免元组,另一种方法是调用first
、one
或all
方法:
query = db.engine.execute("SELECT * FROM blogs "
"WHERE id = 1 ")
assert query.first().name == "Welcome to my blog"