如何在 Python 中运行以 "DO $$ " 开头的 PostgreSQL 查询



我有PostgreSQL查询,它以DO:开头

do
$$
DECLARE
temprow record;
BEGIN
for temprow in
select *
from generate_series(1, 100)
where generate_series % 2 = 0
loop
with cte_input(val) as (select val from (values (temprow.generate_series)) as t(val))
insert
into tmp_table(input_value, value_100)
select cte_input.val as input_value, cte_input.val::float / 100 as value_100
from cte_input;
commit;
end loop;
END
$$ LANGUAGE plpgsql;

如何使用Python和psycopg2运行此查询?如果我需要运行这个带有一些动态更改的查询几次,那么使用临时函数是正确的方法吗?

UPD

感谢@erwin brandstetter提供有关COMMIT的信息。我从查询块中删除了COMMIT,并将其添加到Python代码中:ps_cursor.execute('COMMIT'(.

我用这种方式写代码:


import concurrent.futures
import psycopg2 as pg
from psycopg2 import pool

features = [(1, name_of_feature_1), ...] # list of features

list_query = []

for feature in features:
feature_id = feature[0]
name_feature = feature[1]
query = f"""--Feature:{feature_id}
create or replace procedure pg_temp.proc_feature_{feature_id}_values()
language plpgsql
as

$$
DECLARE
temprow record;
BEGIN
for temprow in
select *
from tmp_maternal_sample
where maternal_sample = 1000
loop
insert
into tmp_feature_values(feature_id, 
feature_values_array,
maternal_sample)
select feature_id,
array_agg(t_rank.{name_feature}) f_values,
temprow.maternal_sample
from t_rank
....
....

end loop;
end
$$;
call pg_temp.proc_feature_{feature_id}_values();
"""
list_query.append(query)

def load_query(query):
ps_connection = threaded_postgreSQL_pool.getconn()
if (ps_connection):
print(f"Successfully recived connection from connection pool for Query {query[:15]} ")
ps_cursor = ps_connection.cursor()
ps_cursor.execute(query)
ps_cursor.execute('COMMIT')
ps_cursor.close()
result = f'Query {query[:15]} finished'
print(result)
return result

try:
threaded_postgreSQL_pool = pool.ThreadedConnectionPool(1, 32, user, password, host, port, database)
if (threaded_postgreSQL_pool):
print("Connection pool created successfully using ThreadedConnectionPool")

with concurrent.futures.ThreadPoolExecutor(max_workers=32) as executor:
future_to_sql = {executor.submit(load_query, query): query for query in list_query}
for future in concurrent.futures.as_completed(future_to_sql):
sql = future_to_sql[future]
try:
data = future.result()

except Exception as exc:
print('%s generated an exception: %s' % (sql[:15], exc))
else:
print('%s page is %s bytes' % (sql[:15], data))

except (Exception, pg.DatabaseError) as error:
print("Error while connecting to PostgreSQL", error)

finally:
if threaded_postgreSQL_pool:
threaded_postgreSQL_pool.closeall
print('Threaded PG connection pool is closed')

假设Postgres 11或更高版本是安全的,因为:

COMMIT在一个plpgsql代码块中工作,但在另一个代码块中不工作

您的DO语句错综复杂,没有明显的原因。更简单:

DO
LANGUAGE plpgsql
$do$
DECLARE
i int;
BEGIN
FOR i IN
SELECT generate_series(2, 100, 2)
LOOP
INSERT INTO tmp_table(input_value, value_100)
VALUES (i, i::float / 100);
-- COMMIT;  -- ?
END LOOP;
END
$do$;

归根结底就是这样——甚至包括创建临时表:

CREATE TEMP TABLE tmp_table AS
SELECT g AS input_value, g::float / 100 AS value_100
FROM   generate_series(2, 100, 2) g;

db<gt;小提琴这里

有些设置(如dbfidd.uk(仍然不允许使用COMMIT进行事务处理。你甚至不确定你需要这个?

无论哪种方式,只要执行原始SQL即可。

最新更新