执行(使用try&except)预构建SQL文件并获得服务器响应/错误的Pythonic方式



我正在执行一系列sql语句,1乘1(初始语句的输出将在以后依赖(。这些查询包括表创建、表更改和视图更新。

我可以在Python1中执行它们,但出于日志记录和安全协议的目的,我需要弄清楚如何实现以下结果:

  1. 如果查询失败,则获取返回的响应
  2. 正确使用try和except不干扰任何没有完全执行的东西

这是我今天使用的代码(在目录和目录中的文件之间循环(。在进入这里的循环之前,有一些变量声明和os目录设置。

编辑:tls别名是我使用的一个自行创建的工具模块,tls.create_lake_engine((函数只是使用sqlalchemy的create_engine方法创建一个连接到sql server的引擎。

for file in contents:
# check if file is a sql statement or a python module
if file.endswith('.sql'):
# read the query
file_contents = open(file).read()
statements = file_contents.split(';')
print('executing {} statements from file {}'.format(
len(statements), file))
statements = [x.strip() for x in statements]
i = 0
for statement in statements:
i+=1
print('texecuting statement {}'.format(i))
engine = tls.create_lake_engine()
conn = engine.connect()
trans = conn.begin()
conn.execute(statement)
trans.commit()

我对sqlalchemy不是很了解,所以您可能希望查看一些细节。但在我看来:

  1. 您应该尝试只创建一次连接,并对处理的每个文件重复使用它,因为这比为每个文件创建连接更有效
  2. 在处理.sql文件时,一旦执行其中一条语句时遇到异常,此时回滚该文件中所做的所有更改并停止执行该文件中的语句可能更有意义。不管怎样,这就是我所采取的方法。如果希望继续执行.sql文件,请从函数execute_script中删除break语句,并无条件执行trans.commit,无论是否发生错误。但从你的问题听起来,你不想";扰乱任何没有完全执行";对我来说,这意味着放弃并从第一个错误中恢复过来
from sqlalchemy.sql import text
def get_statements(file):
"""
A generator function for yielding one by one
all the statements within the passed file.
Lines constituting a statement are stripped of leading and trailing
whitespace and then joined together with a single space character.
"""
with open(file, 'r') as f:
command = []
for line in f:
# Is this a comment line?
if line.startswith('--'):
continue
cmd = line.strip()
if cmd.endswith(';'):
# remove ';'
command.append(cmd[:-1])
yield text(' '.join(command))
command = []
else:
command.append(cmd)
def execute_script(conn, file):
statements = list(get_statements(file))
print(f'executing {len(statements)} statements from file {file}')
for i, statement in enumerate(statements, start=1):
print(f'texecuting statement {i}')
trans = conn.begin()
error = False
try:
conn.execute(statement)
except SQLAlchemyError as e:
# See https://stackoverflow.com/questions/2136739/error-handling-in-sqlalchemy
print(str(e.__dict__['orig']))
error = True
break # Stop execting the file
if error:
trans.rollback()
else:
trans.commit()

engine = tls.create_lake_engine()
conn = engine.connect()
for file in contents:
# check if file is a sql statement or a python module
if file.endswith('.sql'):
execute_script(conn, file)

最新更新