使用多处理时"InterfaceError: connection already closed"。查询PostgreSQL数据库的黑盒上的池函数



我得到了一个python(2.7(函数,该功能将3个字符串作为参数,并返回词典列表。由于项目的性质,我无法更改该功能,这很复杂,调用了其他几个非标准的Python模块,并使用PsychopG2查询PostgreSQL数据库。我认为正是Postgres功能引起了我的问题。

我想使用多处理模块来加快数百次功能。我写了一个"助手"功能,以便可以使用我的功能使用多处理。

from function_script import function
def function_helper(args):
    return function(*args)

我的主要代码看起来像这样:

from helper_script import function_helper
from multiprocessing import Pool
argument_a = ['a0', 'a1', ..., 'a99']
argument_b = ['b0', 'b1', ..., 'b99']
argument_c = ['c0', 'c1', ..., 'c99']
input = zip(argument_a, argument_b, argument_c)
p = Pool(4)
results = p.map(function_helper, input)
print results

我期望的是字典列表,但是我会收到以下错误:

Traceback (most recent call last):
  File "/local/python/2.7/lib/python2.7/site-packages/variantValidator/variantValidator.py", line 898, in validator
    vr.validate(input_parses)
  File "/local/python/2.7/lib/python2.7/site-packages/hgvs/validator.py", line 33, in validate
    return self._ivr.validate(var, strict) and self._evr.validate(var, strict)
  File "/local/python/2.7/lib/python2.7/site-packages/hgvs/validator.py", line 69, in validate
    (res, msg) = self._ref_is_valid(var)
  File "/local/python/2.7/lib/python2.7/site-packages/hgvs/validator.py", line 89, in _ref_is_valid
    var_x = self.vm.c_to_n(var) if var.type == "c" else var
  File "/local/python/2.7/lib/python2.7/site-packages/hgvs/variantmapper.py", line 223, in c_to_n
    tm = self._fetch_TranscriptMapper(tx_ac=var_c.ac, alt_ac=var_c.ac, alt_aln_method="transcript")
  File "/local/python/2.7/lib/python2.7/site-packages/hgvs/decorators/lru_cache.py", line 176, in wrapper
    result = user_function(*args, **kwds)
  File "/local/python/2.7/lib/python2.7/site-packages/hgvs/variantmapper.py", line 372, in _fetch_TranscriptMapper
    self.hdp, tx_ac=tx_ac, alt_ac=alt_ac, alt_aln_method=alt_aln_method)
  File "/local/python/2.7/lib/python2.7/site-packages/hgvs/transcriptmapper.py", line 69, in __init__
    self.tx_identity_info = hdp.get_tx_identity_info(self.tx_ac)
  File "/local/python/2.7/lib/python2.7/site-packages/hgvs/decorators/lru_cache.py", line 176, in wrapper
    result = user_function(*args, **kwds)
  File "/local/python/2.7/lib/python2.7/site-packages/hgvs/dataproviders/uta.py", line 353, in get_tx_identity_info
    rows = self._fetchall(self._queries['tx_identity_info'], [tx_ac])
  File "/local/python/2.7/lib/python2.7/site-packages/hgvs/dataproviders/uta.py", line 216, in _fetchall
    with self._get_cursor() as cur:
  File "/local/python/2.7/lib/python2.7/contextlib.py", line 17, in __enter__
    return self.gen.next()
  File "/local/python/2.7/lib/python2.7/site-packages/hgvs/dataproviders/uta.py", line 529, in _get_cursor
    cur.execute("set search_path = " + self.url.schema + ";")
  File "/local/python/2.7/lib/python2.7/site-packages/psycopg2/extras.py", line 144, in execute
    return super(DictCursor, self).execute(query, vars)
DatabaseError: SSL error: decryption failed or bad record mac

和:

Traceback (most recent call last):
  File "/local/python/2.7/lib/python2.7/site-packages/variantValidator/variantValidator.py", line 898, in validator
    vr.validate(input_parses)
  File "/local/python/2.7/lib/python2.7/site-packages/hgvs/validator.py", line 33, in validate
    return self._ivr.validate(var, strict) and self._evr.validate(var, strict)
  File "/local/python/2.7/lib/python2.7/site-packages/hgvs/validator.py", line 69, in validate
    (res, msg) = self._ref_is_valid(var)
  File "/local/python/2.7/lib/python2.7/site-packages/hgvs/validator.py", line 89, in _ref_is_valid
    var_x = self.vm.c_to_n(var) if var.type == "c" else var
  File "/local/python/2.7/lib/python2.7/site-packages/hgvs/variantmapper.py", line 223, in c_to_n
    tm = self._fetch_TranscriptMapper(tx_ac=var_c.ac, alt_ac=var_c.ac, alt_aln_method="transcript")
  File "/local/python/2.7/lib/python2.7/site-packages/hgvs/decorators/lru_cache.py", line 176, in wrapper
    result = user_function(*args, **kwds)
  File "/local/python/2.7/lib/python2.7/site-packages/hgvs/variantmapper.py", line 372, in _fetch_TranscriptMapper
    self.hdp, tx_ac=tx_ac, alt_ac=alt_ac, alt_aln_method=alt_aln_method)
  File "/local/python/2.7/lib/python2.7/site-packages/hgvs/transcriptmapper.py", line 69, in __init__
    self.tx_identity_info = hdp.get_tx_identity_info(self.tx_ac)
  File "/local/python/2.7/lib/python2.7/site-packages/hgvs/decorators/lru_cache.py", line 176, in wrapper
    result = user_function(*args, **kwds)
  File "/local/python/2.7/lib/python2.7/site-packages/hgvs/dataproviders/uta.py", line 353, in get_tx_identity_info
    rows = self._fetchall(self._queries['tx_identity_info'], [tx_ac])
  File "/local/python/2.7/lib/python2.7/site-packages/hgvs/dataproviders/uta.py", line 216, in _fetchall
    with self._get_cursor() as cur:
  File "/local/python/2.7/lib/python2.7/contextlib.py", line 17, in __enter__
    return self.gen.next()
  File "/local/python/2.7/lib/python2.7/site-packages/hgvs/dataproviders/uta.py", line 526, in _get_cursor
    conn.autocommit = True
InterfaceError: connection already closed

当我尝试过的其他示例中使用它时,是否有人知道会导致池功能行为的是什么?如果这还不足以进行,谁能就解决问题的底部建议我(这是我第一次与别人的代码一起工作(?另外,还有其他方法可以使用多处理模块调用数百次函数吗?

谢谢

我认为可能发生的事情是您的连接对象在所有工人中使用,当1个工人完成所有任务时,它关闭了连接,与此同时,其他工人仍在工作,连接已关闭,因此当其中一名工人试图使用DB已关闭时。

相关内容

  • 没有找到相关文章

最新更新