Python sqlalchemy和mySQL存储过程总是返回0(仅限out param)



我正试图将ROW_COUNT()从MySQL存储过程中获取到python中。

这是我得到的,但我不知道我缺少了什么。

DELIMITER //
CREATE OR REPLACE PROCEDURE sp_refresh_mytable(
OUT row_count INT
)
BEGIN
DECLARE exit handler for SQLEXCEPTION
BEGIN
ROLLBACK;
END;
DECLARE exit handler for SQLWARNING
BEGIN
ROLLBACK;
END;
DECLARE exit handler FOR NOT FOUND
BEGIN
ROLLBACK;
END;
START TRANSACTION;
DELETE FROM mytable;
INSERT INTO mytable
(
col1
, col2
)
SELECT
col1
, col2
FROM othertable
;
SET row_count =  ROW_COUNT();
COMMIT;
END //
DELIMITER ;

如果我像下面这样通过普通SQL调用它,我会得到插入操作的正确row_count(例如,插入的26行(:

CALL sp_refresh_mytable(@rowcount);
select @rowcount as t;
-- output: 26 

然后在python/mysqlalchemy:

def call_procedure(engine, function_name, params=None):
connection = engine.raw_connection()
try:
cursor = connection.cursor()
result = cursor.callproc('sp_refresh_mytable', [0])
## try result outputs
resultfetch = cursor.fetchone()
logger.info(result)
logger.info(result[0])
logger.info(resultfetch)
cursor.close()
connection.commit()
connection.close()
logger.info(f"Running procedure {function_name} success!")
return result
except Exception as e:
logger.error(f"Running procedure {function_name} failed!")
logger.exception(e)
return None
finally:
connection.close()

所以我尝试记录获取out值的不同变化,但它总是0或None。

[INFO] db_update    [0]
[INFO] db_update    0
[INFO] db_update    None

我错过了什么?

谢谢!

在这个答案的帮助下,我找到了以下适合我的解决方案。

a( 使用engine.raw_connection()cursor.callproc:的工作解决方案

def call_procedure(engine, function_name):
connection = engine.raw_connection()
try:
cursor = connection.cursor()
cursor.callproc(function_name, [0])
cursor.execute(f"""SELECT @_{function_name}_0""")
results = cursor.fetchone() ## returns a tuple e.g. (285,)
rows_affected = results[0]
cursor.close()
connection.commit()
logger.info(f"Running procedure {function_name} success!")
return rows_affected
except Exception as e:
logger.error(f"Running procedure {function_name} failed!")
logger.exception(e)
return None
finally:
connection.close()

有了这个答案,我也找到了这个解决方案:

b( 这也起到了作用,而不是使用原始连接:

def call_procedure(engine, function_name, params=None):
try:
with engine.begin() as db_conn:
db_conn.execute(f"""CALL {function_name}(@out)""")
results = db_conn.execute('SELECT @out').fetchone() ## returns a tuple e.g. (285,)
rows_affected = results[0]
logger.debug(f"Running procedure {function_name} success!")
return rows_affected
except Exception as e:
logger.error(f"Running procedure {function_name} failed!")
logger.exception(e)
return None
finally:
if db_conn: db_conn.close()

如果使用其中一种方法比使用另一种方法有任何优点或缺点,请在评论中告诉我。

我只是想添加另一段代码,因为我试图让callproc(使用sqlalchemy(使用多个输入和输出参数

对于这种情况,我使用了callproc方法,使用了一个原始连接[在我之前的回答中的解决方案b(],因为这个函数接受params作为列表。

它可能会做得更优雅,或者在某些方面更像蟒蛇,但这主要是为了让它发挥作用,我可能会从中创建一个函数,这样我就可以用它来通用地调用具有多个输入和输出参数的SP。

我在下面的代码中加入了注释,以便更容易理解正在发生的事情

在我的情况下,我决定把out params放在dict中,这样我就可以把它传递给调用应用程序,以防我需要对结果做出反应。当然,您也可以将in params包括在内,这可能对错误日志记录有意义。

## some in params
function_name   = 'sp_upsert'
in_param1       = 'my param 1'
in_param2       = 'abcdefg'
in_param3       = 'some-name'
in_param4       = 'some display name'
in_params = [in_param1, in_param1, in_param1, in_param1]
## out params
out_params = [
'out1_row_count'
,'out2_row_count'
,'out3_row_count'
,'out4_row_count_ins'
,'out5_row_count_upd'
]
params = copy(in_params)
## adding the outparams as integers from out_params indices
params.extend([i for i, x in enumerate(out_params)])
## the params list will look like
## ['my param 1', 'abcdefg', 'some-name', 'some display name', 0, 1, 2, 3, 4]
logger.info(params)
## build query to get results from callproc (including in and out params)
res_qry_params = []
for i in range(len(params)):
res_qry_params.append(f"@_{function_name}_{i}")
res_qry = f"SELECT  {', '.join(res_qry_params)}"
## the query to fetch the results (in and out params) will look like
## SELECT  @_sp_upsert_0, @_sp_upsert_1, @_sp_upsert_2, @_sp_upsert_3, @_sp_upsert_4, @_sp_upsert_5, @_sp_upsert_6, @_sp_upsert_7, @_sp_upsert_8
logger.info(res_qry)
try:
connection = engine.raw_connection()
## calling the sp
cursor = connection.cursor()
cursor.callproc(function_name, params)
## get the results (includes in and out params), the 0/1 in the end are the row_counts from the sp
## fetchone is enough since all results come as on result record like
## ('my param 1', 'abcdefg', 'some-name', 'some display name', 1, 0, 1, 1, 0)
cursor.execute(res_qry)
results = cursor.fetchone()
logger.info(results)
## adding just the out params to a dict
res_dict = {}
for i, element in enumerate(out_params):
res_dict.update({
element: results[i + len(in_params)]
})
## the result dict in this case only contains the out param results and will look like
## { 'out1_row_count': 1,
##   'out2_row_count': 0,
##   'out3_row_count': 1,
##   'out4_row_count_ins': 1,
##   'out5_row_count_upd': 0}
logger.info(pformat(res_dict, indent=2, sort_dicts=False))
cursor.close()
connection.commit()
logger.debug(f"Running procedure {function_name} success!")
except Exception as e:
logger.error(f"Running procedure {function_name} failed!")
logger.exception(e)

为了完整起见,这里是我的存储过程的缩短版本。在BEGIN之后,我声明了一些错误处理程序,我将out params设置为默认值0,否则,如果过程没有设置,它们也可能返回为NULL/None(例如,因为没有进行插入(:

DELIMITER //
CREATE OR REPLACE PROCEDURE sp_upsert(
IN in_param1 VARCHAR(32),
IN in_param2 VARCHAR(250),
IN in_param3 VARCHAR(250),
IN in_param4 VARCHAR(250),
OUT out1_row_count INTEGER,
OUT out2_row_count INTEGER,
OUT out3_row_count INTEGER,
OUT out4_row_count_ins INTEGER,
OUT out5_row_count_upd INTEGER
)
BEGIN
-- declare variables, do NOT declare the out params here!
DECLARE dummy INTEGER DEFAULT 0;

-- declare error handlers (e.g. continue handler for not found)
DECLARE CONTINUE HANDLER FOR NOT FOUND SET dummy = 1;

-- set out params defaulting to 0
SET out1_row_count = 0;
SET out2_row_count = 0;
SET out3_row_count = 0;
SET out4_row_count_ins = 0;
SET out5_row_count_upd = 0;
-- do inserts and updates and set the outparam variables accordingly
INSERT INTO some_table ...;
SET out1_row_count = ROW_COUNT();
-- commit if no errors
COMMIT;
END //
DELIMITER ;

最新更新