Python 多处理限制.仅限于 3 个数据库连接



我正在使用Python 3.6.8,并且有一个需要运行77次的函数。我正在传入数据,这些数据是从PostgreSQL中提取数据并进行统计分析,然后放回数据库中。我只能同时运行 3 个进程,因为一次一个进程需要很长时间(每个函数调用大约 10 分钟(,而且我一次只能打开 3 个数据库连接。我正在尝试使用多处理的轮询库,它正在尝试一次启动所有这些库,这导致了太多的连接错误。我是否正确使用了轮询方法,如果不是,我应该使用什么来限制只有 3 个同时开始和完成的功能。


def AnalysisOf3Years(data):
FUNCTION RAN HERE
######START OF THE PROGRAM######
print("StartTime ((FINAL 77)): ", datetime.now())
con = psycopg2.connect(host="XXX.XXX.XXX.XXX", port="XXXX", user="USERNAME", password="PASSWORD", dbname="DATABASE")
cur = con.cursor()
date = datetime.today()
cur.execute("SELECT Value FROM table")
Array = cur.fetchall()
Data = []
con.close()
for value in Array:
Data.append([value,date])
p = Pool(3)
p.map(AnalysisOf3Years,Data)
print("EndTime ((FINAL 77)): ", datetime.now())

您似乎只需要短暂的数据库连接,而脚本处理数据所花费的时间大部分。如果是这种情况,您可能希望将数据拉出一次,然后将数据写入磁盘。然后,您可以在程序的每个新实例中从磁盘新鲜加载此数据,而不必担心与数据库的连接限制。

如果你想研究连接池,你希望使用pgbouncer。这是一个单独的程序,位于主程序和数据库之间,汇集了您为其提供的连接数。然后,您可以自由地将脚本编写为单线程程序,并且可以生成机器可以处理的任意数量的实例。

很难说出为什么你的程序行为不端,因为缩进似乎是错误的。但猜测一下,你似乎没有在__main__守卫的一侧创建一个使用你的游泳池。在某些操作系统上,这可能会导致各种奇怪的问题。

您希望行为良好的代码如下所示:

from multiprocessing import Pool
def double(x):
return x * 2
if __name__ == '__main__':
# means pool only gets created in the main parent process and not in the child pool processes
with Pool(3) as pool:
result = pool.map(double, range(5))
assert result == [0, 2, 4, 6, 8, 10]

您可以使用具有数据库连接池作为标准功能的 SQLAlchemy Python 包。

它确实适用于Postgres和许多其他数据库后端。

engine = create_engine('postgresql://me@localhost/mydb',
pool_size=3, max_overflow=0)

pool_size数据库的最大连接数。您可以将其设置为 3。

此页面有一些示例如何将其与 Postgres 一起使用 -

https://docs.sqlalchemy.org/en/13/core/pooling.html

根据您的用例,您可能还对以下内容感兴趣SingletonThreadPool

https://docs.sqlalchemy.org/en/13/core/pooling.html#sqlalchemy.pool.SingletonThreadPool

每个线程维护一个连接的池。

最新更新