使用列表理解的Python多处理问题



我在理解为什么不更新多处理、列表理解输入时遇到问题,特别是下面示例代码中的"val2chg"条目。我不会对此感到惊讶,除非它在没有多处理的情况下运行良好。如果在没有列表理解的情况下传递"相同"列表,则多处理代码是成功的。我相信其他人也遇到过这种情况,但我无法正确地使用我的搜索词来找到答案的链接。

下面是一个简单的例子,结果后面跟着代码。

The input (sld) is
[{'val': 0, 'val2chg': 'not'}, {'val': 1, 'val2chg': 'not'}]
The four results are
multiproc: True. listcomp: True, [{'val': 0, 'val2chg': 'not'}, {'val': 1, 'val2chg': 'not'}]
multiproc: False. listcomp: True, [{'val': 0, 'val2chg': 'changed'}, {'val': 1, 'val2chg': 'changed'}]
multiproc: False. listcomp: False, [{'val': 0, 'val2chg': 'changed'}, {'val': 1, 'val2chg': 'changed'}]
multiproc: True. listcomp: False, [{'val': 0, 'val2chg': 'changed'}, {'val': 1, 'val2chg': 'changed'}]
import multiprocessing as mp
sld = [{'val':0, 'val2chg':'not'}, {'val':1, 'val2chg':'not'}]
print(sld)
def update_values(sld, domp):
global update
def update(idx, some):
some['val2chg'] = 'changed'
return idx, some
if domp:
pool = mp.Pool()
results = [pool.apply_async(update, (idx, sld[idx])) for idx in range(len(sld))]
pool.close()
pool.join()
for result in results:
idx = result.get()[0]
sld[idx] = result.get()[1]
else:
for idx in range(len(sld)):
results = update(idx, sld[idx])
idx = results[0]
sld[idx] = results[1]
domp = True
listcomp = False
if listcomp:
update_values([s for idx, s in enumerate(sld)], domp)
else:
update_values(sld, domp)
print(f'multiproc: {domp}. listcomp: {listcomp}, {sld}')

我会尽力解释这一点:

首先你的声明:

if listcomp:
update_values([s for idx, s in enumerate(sld)], domp)

可以简化为:

if listcomp:
update_values([s for s in sld)], domp)

相当于:

if listcomp:
arg = [s for s in sld)]
update_values(arg, domp)

但是您现在要做的是向update_values传递一个列表arg,它是列表sld的浅副本,而不是sld本身,其中arg[0] == sld[0],即每个列表引用相同的字典元素。

在非多处理情况下,当调用update时,根据domp的值传递对argsld的每个元素的引用。但在任何一种情况下,它们都是sld中包含的相同字典参考。因此,update中的代码some['val2chg'] = 'changed'实际上是就地更新sld引用的字典(同样,这些字典与arg引用的字典相同(。因此,即使listcomp为真,并且update的返回值被用于更新过去的arg列表而不是sld列表,这也无关紧要,因为sld引用的字典元素已经被update更新了。

但是,在多处理情况下,传递给update的是字典引用的序列化/反序列化副本。语句some['val2chg'] = 'changed'正在修改该副本,该副本位于执行该语句的池进程的地址空间中。因此,我们现在必须依靠update_values使用update的返回值来更新传递的sld参数,不幸的是,它不是全局sld列表,而是它的副本arg

以下是更新的代码,与您的代码不同,它只适用于默认情况下使用fork方法创建新进程的平台(例如Linux(,应该适用于所有平台。我还修改了间距以符合PEP8指南:

import multiprocessing as mp
def update(idx, some):
some['val2chg'] = 'changed'
return idx, some
def update_values(sld, domp):
if domp:
pool = mp.Pool()
results = [pool.apply_async(update, (idx, sld[idx])) for idx in range(len(sld))]
pool.close()
pool.join()
for result in results:
idx, some = result.get()
sld[idx] = some
else:
for idx in range(len(sld)):
results = update(idx, sld[idx])
idx = results[0]
sld[idx] = results[1]
if __name__ == '__main__':
domp = True
listcomp = True
sld = [{'val':0, 'val2chg':'not'}, {'val':1, 'val2chg':'not'}]
print(sld)
if listcomp:
#arg = [s for idx, s in enumerate(sld)]
# The above can be simplified to:
arg = [s for s in sld]
# But now arg is a copy of sld and it is the copy that is being modified
update_values(arg, domp)
# We now need this (but we really shouldn't be using a list comprehension at all):
sld = arg
else:
update_values(sld, domp)
print(f'multiproc: {domp}. listcomp: {listcomp}, {sld}')

打印:

[{'val': 0, 'val2chg': 'not'}, {'val': 1, 'val2chg': 'not'}]
multiproc: True. listcomp: True, [{'val': 0, 'val2chg': 'changed'}, {'val': 1, 'val2chg': 'changed'}]

最新更新