我正在使用Python的multiprocessing
来创建并行应用程序。进程需要共享一些数据,为此我使用Manager
.但是,我有一些常见的函数,这些函数需要进程调用,并且需要访问Manager
对象存储的数据。我的问题是我是否可以避免需要将Manager
实例作为参数传递给这些常见函数,而是像全局一样使用它。换句话说,请考虑以下代码:
import multiprocessing as mp
manager = mp.Manager()
global_dict = manager.dict(a=[0])
def add():
global_dict['a'] += [global_dict['a'][-1]+1]
def foo_parallel(var):
add()
print var
num_processes = 5
p = []
for i in range(num_processes):
p.append(mp.Process(target=foo_parallel,args=(global_dict,)))
[pi.start() for pi in p]
[pi.join() for pi in p]
这运行良好,并在我的机器上返回p=[0,1,2,3,4,5]
。然而,这是"好形式"吗?这是否是一种好方法,就像定义add(var)
并调用add(var)
一样好吗?
您的代码示例似乎比表单存在更大的问题。只有运气好,您才能获得所需的输出。重复执行会产生不同的结果。那是因为+=
不是原子操作。多个进程可以一个接一个地读取相同的旧值,然后它们中的任何一个更新它,它们将写回相同的值。为了防止这种行为,您必须另外使用Manager.Lock
。
对于您关于"良好形式"的原始问题。
IMO 会更干净,让子进程的主函数foo_parallel
,global_dict
显式传递到泛型函数add(var)
中。这将是依赖注入的一种形式,并且具有一些优点。在您的示例中,并非详尽无遗:
允许隔离测试
提高代码可重用性
更轻松的调试(检测托管对象的不可访问性不应延迟到调用
add
(快速失败(更少的样板代码(例如,多个函数所需的资源上的 try-excepts 块(
作为旁注。仅对其副作用使用列表推导被认为是"代码气味"。如果您不需要列表作为结果,只需使用 for 循环即可。
法典:
import os
from multiprocessing import Process, Manager
def add(l):
l += [l[-1] + 1]
return l
def foo_parallel(global_dict, lock):
with lock:
l = global_dict['a']
global_dict['a'] = add(l)
print(os.getpid(), global_dict)
if __name__ == '__main__':
N_WORKERS = 5
with Manager() as manager:
lock = manager.Lock()
global_dict = manager.dict(a=[0])
pool = [Process(target=foo_parallel, args=(global_dict, lock))
for _ in range(N_WORKERS)]
for p in pool:
p.start()
for p in pool:
p.join()
print('result', global_dict)