多处理 / psycopg2 类型错误:无法腌制_thread。RLock 对象



我遵循以下代码,以便在postgres数据库上实现并行选择查询:

https://tech.geoblink.com/2017/07/06/parallelizing-queries-in-postgresql-with-python/

我的基本问题是我需要执行 ~6k 个查询,我正在尝试优化这些选择查询的执行。最初它是一个查询,where id in (...)包含所有 6k 谓词 ID,但我在运行它的机器上遇到查询使用>多达 4GB RAM 的问题,所以我决定将其拆分为 6k 个单独的查询,当同步保持稳定的内存使用量时。但是,运行时间需要更长的时间,这对于我的用例来说不是什么问题。即便如此,我还是试图尽可能减少时间。

这是我的代码的样子:

class PostgresConnector(object):
def __init__(self, db_url):
self.db_url = db_url
self.engine = self.init_connection()
self.pool = self.init_pool()
def init_pool(self):
CPUS = multiprocessing.cpu_count()
return multiprocessing.Pool(CPUS)
def init_connection(self):
LOGGER.info('Creating Postgres engine')
return create_engine(self.db_url)
def run_parallel_queries(self, queries):
results = []
try:
for i in self.pool.imap_unordered(self.execute_parallel_query, queries):
results.append(i)
except Exception as exception:
LOGGER.error('Error whilst executing %s queries in parallel: %s', len(queries), exception)
raise
finally:
self.pool.close()
self.pool.join()
LOGGER.info('Parallel query ran producing %s sets of results of type: %s', len(results), type(results))
return list(chain.from_iterable(results))
def execute_parallel_query(self, query):
con = psycopg2.connect(self.db_url)
cur = con.cursor()
cur.execute(query)
records = cur.fetchall()
con.close()
return list(records)

但是,每当它运行时,我都会收到以下错误:

TypeError: can't pickle _thread.RLock objects

我已经读过很多关于使用多处理和可腌制对象的类似问题,但我一生都无法弄清楚我做错了什么。

池通常是每个进程一个(我认为这是最佳做法(,但每个连接器类的实例共享,因此它不会为每次使用 parallel_query 方法创建一个池。

类似问题的最高答案:

从 Python 多处理访问 MySQL 连接池

显示了与我自己的几乎相同的实现,除了使用 MySql 而不是 Postgres。

我做错了什么吗?

谢谢!

编辑:

我找到了这个答案:

Python Postgres psycopg2 ThreadedConnectionPool exhausted

这非常详细,看起来好像我误解了multiprocessing.PoolThreadedConnectionPool等连接池给我的东西。但是在第一个链接中,它没有提到需要任何连接池等。这个解决方案看起来不错,但似乎有很多代码,我认为这是一个相当简单的问题?

编辑2:

所以上面的链接解决了另一个问题,无论如何我都可能会遇到这个问题,所以我很高兴我找到了,但它并没有解决无法使用imap_unordered酸洗错误的初始问题。非常令人沮丧。

最后,我认为可能值得注意的是,这在 Heroku 中运行,在工人测功机上,使用 Redis rq 进行调度、后台任务等,并将 Postgres 的托管实例作为数据库。

简单地说,postgres 连接和 sqlalchemy 连接池是线程安全的,但它们不是分叉安全的。

如果要使用多处理,则应在分叉后的每个子进程中初始化引擎。

如果要共享引擎,则应改用多线程。

请参阅 psycopg2 文档中的螺纹和过程安全性:

libpq 连接 不应该被分叉进程使用,所以当使用这样的模块时 作为多处理或分叉Web部署方法(如FastCGI(使 确保在分叉后创建连接。

如果您使用的是多处理。池,有一个关键字参数初始值设定项,可用于在每个子进程上运行一次代码。试试这个:

class PostgresConnector(object):
def __init__(self, db_url):
self.db_url = db_url
self.pool = self.init_pool()
def init_pool(self):
CPUS = multiprocessing.cpu_count()
return multiprocessing.Pool(CPUS, initializer=self.init_connection(self.db_url))
@classmethod
def init_connection(cls, db_url):
def _init_connection():
LOGGER.info('Creating Postgres engine')
cls.engine = create_engine(db_url)
return _init_connection
def run_parallel_queries(self, queries):
results = []
try:
for i in self.pool.imap_unordered(self.execute_parallel_query, queries):
results.append(i)
except Exception as exception:
LOGGER.error('Error whilst executing %s queries in parallel: %s', len(queries), exception)
raise
finally:
pass
#self.pool.close()
#self.pool.join()
LOGGER.info('Parallel query ran producing %s sets of results of type: %s', len(results), type(results))
return list(chain.from_iterable(results))
def execute_parallel_query(self, query):
with self.engine.connect() as conn:
with conn.begin():
result = conn.execute(query)
return result.fetchall()
def __getstate__(self):
# this is a hack, if you want to remove this method, you should
# remove self.pool and just pass pool explicitly
self_dict = self.__dict__.copy()
del self_dict['pool']
return self_dict

现在,解决XY问题。

最初它是一个查询,其中包含 (...( 中的 where id 所有 6k 谓词 ID,但我在运行它的机器上遇到查询使用>4GB RAM 的问题,所以我决定将其拆分为 6k 个单独的查询,当同步保持稳定的内存时 用法。

您可能想要执行以下选项之一:

  1. 编写一个生成所有 6000 个 ID 的子查询,并在原始批量查询中使用该子查询。
  2. 如上所述,但将子查询编写为 CTE
  3. 如果您的 ID 列表来自外部源(即不是来自数据库(,则可以创建一个包含 6000 个 ID 的临时表,然后针对临时表运行原始批量查询

但是,如果您坚持通过 python 运行 6000 个 ID,那么最快的查询可能既不是一次性完成所有 6000 个 ID(这会耗尽内存(,也不是运行 6000 个单独的查询。相反,您可能希望尝试对查询进行分块。例如,一次发送 500 个 ID。您必须试验区块大小,以确定一次可以发送的最大 ID 数,同时仍然在您的内存预算范围内。

最新更新