多处理.池映射多个具有共享值的参数(已解析)



我正在练习使用共享值进行多处理。我有一个现有的流程功能,它正在使用共享价值:

def run_procs_with_loop(lock):
# this is my shared value 
shared_number = Value('i', 0)
print(__name__, 'shared_value in the beginning', shared_number.value)
# create a process list to append each process spawned by the for- loop
processes = []
for _ in range(2):
p = Process(target=add_100_locking, args=(shared_number, lock))
processes.append(p)
p.start()
for _ in processes:
p.join()
print('shared_value at the end', shared_number.value)

上面的进程被引导生成两个进程,每个进程都被引导到一个带有args(shared_number,lock(的函数。函数按预期运行。

我试图将其转换为多处理池-我试图在Pool.map((语句中传递参数`[shared_number,lock]*2(我希望池只生成两个进程(,但python拒绝了它:

def run_procs_with_pool(lock):
shared_number = Value('i', 0)
print(__name__, 'shared_value in the beginning', shared_number.value)
# create processes using multiprocessing.Pool
pool = Pool()
pool.map(add_100_with_lock, [(shared_number,lock)] * 2)
print('shared_value at the end', shared_number.value)

感谢您提前提供任何有用的意见。


更新:

有人建议我使用星图而不是地图,但我得到了错误RuntimeError: Synchronized objects should only be shared between processes through inheritance。看起来CCD_ 2不允许以这种方式传递共享值?

我想分享任务函数add_100_with_lock,如下所示:

def add_100_with_lock(num,locking):
for _ in range(100):
time.sleep(0.001)
with lock:
num.value += 1    

有没有办法使传递共享值与多处理一起工作。游泳池工作?

当您编写时

pool.map(add_100_with_lock, [(shared_number,lock)] * 2)

作为参数传递的可迭代对象是tuples的list,因此add_100_with_lock不会得到两个参数,而是一个元组,就好像调用了add_100_with_lock((shared_number,lock,))而不是add_100_with_lock(shared_number,lock)一样。CCD_ 9是针对仅具有一个参数的功能而实现的。

您可以更改add_100_with_lock的定义,尽管我不建议使用此解决方案。您还可以将其封装到另一个函数中,该函数接收元组并调用它,即:

def wrap_add_100(args):
return add_100_with_lock(*args)
...
pool.map(wrap_add_100, [(shared_number,lock)] * 2)

或者使用Pool.starmap,它需要一个迭代列表,并将每个迭代列表中的一个用作参数:

pool.starmap(add_100_with_lock, [[shared_number]*2, [lock]*2])

最后一个选项是我推荐的,因为它保留了函数签名。

我终于能够通过使用Manager((对象来解决多处理池中关于共享变量的限制问题——根据python文档:Managers provide a way to create data "which can be shared between different processes", including sharing over a network between processes running on different machines.

我就是这样做的:

# use the manager class to share objects between processes
manager = Manager()
shared_number = manager.Value('i', 0)

由于我只传递shared_number(锁对象在Pool创建时使用initializer=kwarg传递(您可以在这里的multiprocessing lock()讨论中阅读到它的全部内容(,所以我可以回到使用Pool.map((而不是Pool.starmap((.

这是完整的工作模块:

from  multiprocessing import Lock, Pool, Manager
import time
# init function passed to Pool initializer to share multiprocessing.Lock() object to worker processes
def init_lock(l, ):
global lock
lock = l
def add_100_with_lock(num):
# Since our pool process spawns TWO worker processes, and both processes share the 'num' variable, 
# this 'num' value will be 200 after our two processes are done executing (100 * 2 parallel processes = 200).
# I applied multiprocess locking here to avoid race conditions between worker processes
for _ in range(100):
time.sleep(0.001)
with lock:
num.value += 1
# Pool method 
def run_procs_lock_with_pool():

# use the manager class to share objects between processes
manager = Manager()
shared_number = manager.Value('i', 0)
print(__name__, 'shared_value in the beginning', shared_number.value)
# like shared values, locks cannot be shared in a Pool - instead, pass the 
# multiprocessing.Lock() at Pool creation time, using the initializer=init_lock.
# This will make your lock instance global in all the child workers.
# The init_lock is defined as a function - see init_lock() at the top.
pool = Pool(initializer=init_lock, initargs=(l,))
# specified two worker processes in the pool with the arg "[shared_number]*2"
pool.map(add_100_with_lock, [shared_number]*2)

print('shared_value at the end', shared_number.value)

if __name__ == '__main__':
run_procs_lock_with_pool()

相关内容

  • 没有找到相关文章

最新更新