在池进程之间共享队列对象



我使用pathosProcessingPool类来调度跨多个内核并发执行run_regex()函数。该函数将正则表达式作为参数,并计算匹配项的列表条目。如果找到匹配项,则会将匹配值放入result_queue中。

据我了解,目前每个工作进程都在其虚拟地址空间中创建result_queue的本地副本。但是,我想将此 Queue 对象用作共享内存机制,以便从主进程访问所有匹配项。

问题:

  1. 有没有办法将队列对象传递到池初始值设定项中,以便队列充当共享内存部分?
  2. 是否需要与队列对象同步?
  3. 有没有更好的方法来解决这个问题?

代码片段

from multiprocessing import Lock, Queue
from pathos.multiprocessing import ProcessingPool
result_queue = Queue()
lock = Lock()
data = {}
def run_regex(self, expr):
for key, value in data.iteritems():
matchStr = re.search(expr, key, re.I)
if matchStr:
lock.acquire()
result_queue.put(key)
lock.release()
break
def check_path(self):
pool = ProcessingPool()
pool.map(run_regex, in_regex)
  1. 是的,您可以查看Pool对象的初始值设定项参数。
  2. Queue对象已经是 MP 安全的,因此无需保护它们。
  3. 不需要Queue即可从run_regex函数返回值。您只需从函数返回key,它就会在map结果中可用。

    def run_regex(expr):
    group = []
    for key, value in data.iteritems():
    match = re.search(expr, key, re.I)
    if match is not None:
    group.append(key)
    return group
    groups = pool.map(run_regex, in_regex)
    keys = [key for group in groups for key in group]
    

    keys = list(itertools.chain.from_iterable(groups))
    

    map将返回按run_regex分组的密钥。之后,您可以轻松地展平列表。

最新更新