如何使用 PyGreSQL 执行并行查询?



我正在尝试与 PyGreSQL 和多处理并行运行多个查询,但以下代码挂起而不返回:

from pg import DB
from multiprocessing import Pool
from functools import partial

def create_query(table_name):
return f"""create table {table_name} (id integer);
CREATE INDEX ON {table_name} USING BTREE (id);"""
my_queries = [ create_query('foo'), create_query('bar'), create_query('baz') ]

def execute_query(conn_string, query):
con = DB(conn_string)
con.query(query)
con.close()
rs_conn_string = "host=localhost port=5432 dbname=postgres user=postgres password="
pool = Pool(processes=len(my_queries))
pool.map(partial(execute_query,rs_conn_string), my_queries)

有什么方法可以让它工作吗?此外,如果一个查询失败而另一个查询回滚,是否可以将 3 个正在运行的查询放在同一个"事务"中?

一个明显的问题是你总是运行pool.map,不仅在主进程中,而且在并行子进程中使用的解释器导入脚本时。你应该做这样的事情:

def run_all():
with Pool(processes=len(my_queries)) as pool:
pool.map(partial(execute_query,rs_conn_string), my_queries)
if __name__ == '__main__':
run_all()

关于您的第二个问题,这是不可能的,因为事务是每个连接的,如果您这样做,它们存在于单独的进程中。

异步命令处理可能是你想要的,但 PyGreSQL 尚不支持它。Psygopg + aiopg 可能更适合做这样的事情。

PyGreSql 添加了 connection.poll(( 方法的异步。就池化而言,我喜欢覆盖MySQL.connectorspooling包装器来处理pgdb连接对象。有一些"可选"的连接方法调用会失败,你必须注释掉(即检查连接状态等,如果需要,可以在 Pgdb 连接对象级别实现,但调用不匹配 MySQL.connectors api 接口(。可能有一些低级错误,因为库只是以类似的方式抽象,但这个解决方案已经在 prod 中运行了几个月,没有任何问题。

最新更新