我已经阅读了这里的文档,似乎为了确保Value不会挂起,我们需要使用锁。我就这么做了,但它仍然被卡住了:
from multiprocessing import Process, Value, freeze_support, Lock
nb_threads = 3
nbloops = 10
v = Value('i', 0)
def run_process(lock):
global nbloops
i = 0
while i < nbloops:
# do stuff
i += 1
with lock:
v.value += 1
# wait for all the processes to finish doing something
while v.value % nb_threads != 0:
pass
if __name__ == '__main__':
freeze_support()
processes = []
lock = Lock()
for i in range(0, 3):
processes.append( Process( target=run_process, args=(lock,) ) )
for process in processes:
process.start()
for process in processes:
process.join()
我尝试过使用锁访问该值,但它仍然会阻止:
val = -1
while val % nb_threads != 0:
with lock:
val = v.value
我该怎么解决这个问题?感谢
您的代码有一个竞赛条件;您不能保证所有三个进程在允许它们继续前进之前都脱离while v.value % nb_threads != 0
循环。这允许其中一个或两个进程继续前进到while i < nbloops
循环的下一次迭代,递增v.value
,然后防止其余进程脱离自己的while v.value % nb_threads != 0
循环。您试图在那里进行的同步最好由Barrier
处理,而不是循环并重复检查值。
此外,默认情况下,multiprocessing.Value
也有一个内置的同步,您可以通过调用Value.get_lock
来显式访问它用于同步的Lock
,因此不需要为每个进程显式地使用自己的Lock
。合在一起,你就有了:
from multiprocessing import Process, Value, freeze_support, Lock, Barrier
nb_threads = 3
nbloops = 10
v = Value('i', 0)
def run_process(barrier):
global nbloops
i = 0
while i < nbloops:
# do stuff
i += 1
with v.get_lock():
v.value += 1
# wait for all the processes to finish doing something
out = barrier.wait()
if __name__ == '__main__':
freeze_support()
processes = []
b = Barrier(nb_threads)
for i in range(0, nb_threads):
processes.append( Process( target=run_process, args=(b,) ) )
for process in processes:
process.start()
for process in processes:
process.join()
Barrier
保证,在所有进程都调用了Barrier.wait()
之前,任何进程都不能进入循环的下一次迭代,此时,所有进程都能够同时进行。Barrier
对象支持重用,因此可以在每次迭代中安全地调用它。