如何将共享/托管词典与流程池(Python 3.x)一起使用



我正在从事一个项目,该项目需要我从某些文件中提取大量信息。关于该项目的格式和大多数信息对于我要问的内容并不重要。我主要不明白如何与流程池中的所有过程共享该词典。

这是我的代码(更改了变量名称,并将大部分代码删除以仅知道零件):

import json
import multiprocessing
from multiprocessing import Pool, Lock, Manager
import glob
import os
def record(thing, map):
    with mutex:
        if(thing in map):
            map[thing] += 1
        else:
            map[thing] = 1

def getThing(file, n, map): 
    #do stuff
     thing = file.read()
     record(thing, map)

def init(l):
    global mutex
    mutex = l
def main():
    #create a manager to manage shared dictionaries
    manager = Manager()
    #get the list of filenames to be analyzed
    fileSet1=glob.glob("filesSet1/*")
    fileSet2=glob.glob("fileSet2/*")
    #create a global mutex for the processes to share
    l = Lock()   
    map = manager.dict()
    #create a process pool, give it the global mutex, and max cpu count-1 (manager is its own process)
    with Pool(processes=multiprocessing.cpu_count()-1, initializer=init, initargs=(l,)) as pool:
        pool.map(lambda file: getThing(file, 2, map), fileSet1) #This line is what i need help with
main()

据我了解,LAMDA功能应该起作用。我需要帮助的行是:pool.map(lambda文件:getthing(文件,2,地图),fileset1)。它给我一个错误。给出的错误是" attributeError:cant Cickle本地对象'main ..'"。

任何帮助将不胜感激!

为了并行执行任务, multiprocessing"腌制"任务函数。在您的情况下,此"任务功能"是lambda file: getThing(file, 2, map)

不幸的是,默认情况下,lambda功能无法在python中腌制(另请参见此Stackoverflow帖子)。让我用最少的代码来说明问题:

import multiprocessing
l = range(12)
def not_a_lambda(e):
    print(e)
def main():
    with multiprocessing.Pool() as pool:
        pool.map(not_a_lambda, l)        # Case (A)
        pool.map(lambda e: print(e), l)  # Case (B)
main()

案例A 中,我们有一个适当的免费功能,可以腌制,因此pool.map操作将起作用。在案例B 中,我们有一个lambda功能,并且会发生崩溃。

一种可能的解决方案是使用适当的模块范围函数(例如我的not_a_lambda)。另一个解决方案是依靠第三方模块(例如Dill)扩展腌制功能。在后一种情况下,您将使用诸如pathos作为常规multiprocessing模块的替代。最后,您可以创建一个Worker类,该类将您的共享状态作为成员。这看起来像这样:

import multiprocessing
class Worker:
    def __init__(self, mutex, map):
        self.mutex = mutex
        self.map = map
    def __call__(self, e):
        print("Hello from Worker e=%r" % (e, ))
        with self.mutex:
            k, v = e
            self.map[k] = v
        print("Goodbye from Worker e=%r" % (e, ))
def main():
    manager = multiprocessing.Manager()
    mutex = manager.Lock()
    map = manager.dict()
    # there is only ONE Worker instance which is shared across all processes
    # thus, you need to make sure you don't access / modify internal state of
    # the worker instance without locking the mutex.
    worker = Worker(mutex, map)
    with multiprocessing.Pool() as pool:
        pool.map(worker, l.items())
main()

相关内容

  • 没有找到相关文章

最新更新