连接到 PostgreSQL 时出错无法腌制 psycopg2.extensions.connection 对象



我正在尝试创建一个具有主父进程的架构,它可以创建新的子进程。主父进程将始终处于循环状态,以检查是否有任何子进程可用。

我使用了psycopg2.pool模块的ThreadedConnectionPool,以便为所有创建的子进程提供公共数据库连接。这意味着程序将连接到数据库一次,并为每个子进程执行所有SQL查询。因此,无需每次都连接到数据库来执行SQL查询。

代码如下:

from multiprocessing import Process, Lock
import time, os, psycopg2
from psycopg2 import pool
def child(dbConnection, lock, num, pid, sleepTime, query):
lock.acquire()
start = time.time()
print("Child Process {} - Process ID: {}".format(num + 1, str(os.getpid())))
db_cursor = dbConnection.cursor()
db_cursor.execute(query)
records = db_cursor.fetchmany(2)
print("Displaying rows from User Master Table")
for row in records:
print(row)
print("Executed Query:", query)
print("Child Process {} - Process ID {} Completed.".format(num + 1, str(os.getpid())))
end = time.time()
print("Time taken:", str(end - start), "seconds")
lock.release()
time.sleep(sleepTime)
if __name__ == "__main__":
try:
connectionPool = psycopg2.pool.ThreadedConnectionPool(5, 21, user = "dwhpkg", password = "dwhpkg", host = "127.0.0.1", port = "5432", database = "dwhdb")
while True:
processes = []
print("Main Process ID: {}".format(str(os.getpid())))
lock = Lock()

# 21 Times Process Execution
for count in range(21):
if connectionPool :
print("Connection Pool Successfully Created")
# Getting DB Connection From Connection Pool
dbConnection = connectionPool.getconn()
if dbConnection:
sql_execute_process = Process(target = child, args = (dbConnection, lock, count, os.getpid(), 4, 'SELECT * FROM public."USER_MASTER"',))
sql_execute_process.start()
processes.append(sql_execute_process)
print("Parent Process:", os.getpid())
print(processes)
time.sleep(5)
for process in processes:
process.join()
except (Exception, psycopg2.DatabaseError) as error:
print("Error Connecting To PostgreSQL", error)
finally:
# Closing DB Connection
if connectionPool:
connectionPool.closeall
print("Connection Pool is closed")

当我尝试运行上面的代码时,它给出了以下错误:

Main Process ID: 46700
Connection Pool Successfully Created
Error Connecting To PostgreSQL can't pickle psycopg2.extensions.connection objects
Connection Pool is closed
(task_env) C:UserssicuserDesktopジート3_作業案件タスク機能プロトタイプ作成開発>Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:UserssicuserAppDataLocalProgramsPythonPython37libmultiprocessingspawn.py", line 99, in spawn_main
new_handle = reduction.steal_handle(parent_pid, pipe_handle)
File "C:UserssicuserAppDataLocalProgramsPythonPython37libmultiprocessingreduction.py", line 82, in steal_handle
_winapi.PROCESS_DUP_HANDLE, False, source_pid)
OSError: [WinError 87] The parameters are incorrect.

为了进行故障排除,我还使用了调试模式并尝试找出错误位置。使用调试,我发现由于以下行而发生错误:

sql_execute_process.start()

【错误信息】

Main Process ID: 47708
Connection Pool Successfully Created
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:UserssicuserAppDataLocalProgramsPythonPython37libmultiprocessingspawn.py", line 105, in spawn_main
exitcode = _main(fd)
File "C:UserssicuserAppDataLocalProgramsPythonPython37libmultiprocessingspawn.py", line 115, in _main
self = reduction.pickle.load(from_parent)
EOFError: Ran out of input
Error Connecting To PostgreSQL can't pickle psycopg2.extensions.connection objects

操作系统环境是Windows和Python版本:Python 3.7.4

期待专家的支持。

在上面的解决方案中,您使用的是具有multiprocessing.Process实例的ThreadedConnectionPool(线程 != 进程)。
多个进程无法安全地共享同一连接;查看 Psycopg 关于螺纹和过程安全部分的详细信息。

您还对子级中的关键代码使用了Lock,这基本上阻止了您并行执行任务;即使它有效,性能也与单个流程解决方案基本相同。

解决方案取决于子进程的 CPU 密集度和长寿命:

  • 如果孩子会很轻/短暂,只需使用单个(主)线程完成所有工作
  • 对于繁重/生存期长的子进程,从子进程内部连接到数据库(不要与主进程共享连接)

相关内容

最新更新