修改多处理池管理器字典中的列表



我有一个元素列表,我正在多处理apply_async任务中处理这些元素,并使用管理器dict中的一个键逐个更新处理过的元素,我想在该键上映射整个列表。

我尝试了以下代码:

#!/usr/bin/python
from multiprocessing import Pool, Manager
def spammer_task(d, my_list):
    #Initialize manager dict
    d['task'] = {
        'processed_list': []
    }
    for ele in my_list:
        #process here
        d['task']['processed_list'].append(ele)
    return
p = Pool()
m = Manager()
d = m.dict()
my_list = ["one", "two", "three"]
p.apply_async(spammer_task (d, my_list))
print d

最后,它只是在dict中发布了一个空列表。输出:

{'任务':{'已处理列表':[]}}

现在,经过一点研究,我知道管理器dict中的元素是不可变的,所以你必须用新数据重新初始化整个dict才能更新它。所以我尝试了下面的代码,但它给出了一个奇怪的错误。

#!/usr/bin/python
from multiprocessing import Pool, Manager
def spammer_task(d, my_list):
    #Initialize manager dict
    d['task'] = {
        'processed_list': []
    }
    for ele in my_list:
        #process here
        old_list = d['task']['processed_list']
        new_list = old_list.append(ele)
        #Have to do it this way since elements inside a manager dict become
        #immutable so
        d['task'] = {
            'processed_list': new_list
        }
    return
p = Pool()
m = Manager()
d = m.dict()
my_list = ["one", "two", "three"]
p.apply_async(spammer_task (d, my_list))
print d

输出:

Traceback(最后一次调用):文件"./a.py",第29行,在p.apply_async(spammer_task(d,my_list))文件"./a.py",第14行,在spammer_ttask中new_list=old_list.append(ele)AttributeError:"NoneType"对象没有属性"append"

不知怎么的,它似乎把None添加到了列表中,我不知道为什么。

根据https://bugs.python.org/issue6766

以下代码修复了它,通过复制整个任务dict,然后修改并重新复制

#!/usr/bin/python
from multiprocessing import Pool, Manager
def spammer_task(d, my_list):
    #Initialize manager dict
    d['task'] = {
        'processed_list': []
    }
    for ele in my_list:
        #process here
        foo = d['task']
        foo['processed_list'].append(ele)
        d['task'] = foo
    return
p = Pool()
m = Manager()
d = m.dict()
my_list = ["one", "two", "three"]
p.apply_async(spammer_task (d, my_list))
print d

输出:

{‘task’:{‘processed_list’:[‘one’,‘two’,‘three’]}}

除了确保d在打印时确实包含某些内容外,结果仍然是{'task': {'processed_list': ['one', 'two', 'three']}}

#!/usr/bin/python
from multiprocessing import Pool
def spammer_task(my_list):
    #Initialize manager dict
    out= {
        'processed_list': []
    }
    for ele in my_list:
        #process here
        out['processed_list'].append(ele)
    return 'task',out

my_list = ["one", "two", "three"]
if __name__=="__main__":
    p = Pool()
    d=dict(p.imap_unordered(spammer_task, [my_list])) #this line blocks until finished
    print d

最新更新