为什么这个小片段使用multiprocessing与maxtasksperchild,numpy.random.rand



我有一个python脚本,它以随机方式并发处理numpy数组和图像。为了在生成的进程中具有适当的随机性,我将一个随机种子从主进程传递给工人,以便它们被播种。

当我使用maxtasksperchild作为Pool时,我的脚本在多次运行Pool.map后挂起。

以下是重现该问题的最小代码段:

# This code stops after multiprocessing.Pool workers are replaced one single time.
# They are replaced due to maxtasksperchild parameter to Pool
from multiprocessing import Pool
import numpy as np
def worker(n):
# Removing np.random.seed solves the issue
np.random.seed(1) #any seed value
return 1234 # trivial return value
# Removing maxtasksperchild solves the issue
ppool = Pool(20 , maxtasksperchild=5)
i=0
while True:
i += 1
# Removing np.random.randint(10) or taking it out of the loop solves the issue
rand = np.random.randint(10)
l  = [3] # trivial input to ppool.map
result = ppool.map(worker, l)
print i,result[0]

这是输出

1 1234 2 1234 3 1234 . . . 99 1234 100 1234 # 此时,工作人员应该已达到每个子任务的最大任务数 101 1234 102 1234 103 1234 104 1234 105 1234 106 1234 107 1234 108 1234 109 1234 然后 110 1234

无限期挂起。

我可能会用python的random替换numpy.random并解决问题。但是,在我的实际应用程序中,worker将执行我无法控制的用户代码(作为参数提供给worker),并希望允许在该用户代码中使用numpy.random函数。所以我有意想要播种全局随机生成器(为每个进程独立)。

这是用Python 2.7.10,numpy 1.11.0,1.12.0和1.13.0,Ubuntu和OSX测试的。

事实证明,这是来自 Python 错误threading.Lockmultiprocessing的交互。

np.random.seed和大多数np.random.*函数都使用threading.Lock来确保线程安全。np.random.*函数生成一个随机数,然后更新种子(跨线程共享),这就是需要锁的原因。参见 np.random.seed 和 cont0_array(np.random.random()和其他人使用)。

现在这如何导致上面的代码片段出现问题?

简而言之,代码段挂起是因为分叉时继承了threading.Lock状态。因此,当子项在父项中获取锁的同时分叉时(通过np.random.randint(10)),子项将死锁(在np.random.seed)。

@njsmith在此 github 问题中对此进行了解释 https://github.com/numpy/numpy/issues/9248#issuecomment-308054786

多处理。池生成一个后台线程来管理工作线程:https://github.com/python/cpython/blob/aefa7ebf0ff0f73feee7ab24f4cdcb2014d83ee5/Lib/multiprocessing/pool.py#L170-L173

它在后台循环调用_maintain_pool:https://github.com/python/cpython/blob/aefa7ebf0ff0f73feee7ab24f4cdcb2014d83ee5/Lib/multiprocessing/pool.py#L366

如果工作线程退出,例如由于最大任务每个子级限制,则_maintain_pool调用_repopulate_pool:https://github.com/python/cpython/blob/aefa7ebf0ff0f73feee7ab24f4cdcb2014d83ee5/Lib/multiprocessing/pool.py#L240

然后_repopulate_pool分叉了一些新工人,仍然在这个后台线程中:https://github.com/python/cpython/blob/aefa7ebf0ff0f73feee7ab24f4cdcb2014d83ee5/Lib/multiprocessing/pool.py#L224

所以发生的事情是,最终你会倒霉,在你的主线程调用某个 np.random 函数并保持锁的同时,多处理决定分叉一个孩子,它开始于 np.random 锁已经持有但持有它的线程消失了。然后孩子尝试调用 np.random,这需要锁,因此孩子死锁。

这里的简单解决方法是不在多处理中使用 fork。如果您使用生成或分叉服务器启动方法,那么这应该消失。

为了适当的修复....呸。我想我们..需要注册一个pthread_atfork预分叉处理程序,该处理程序在分叉之前获取 np.random 锁,然后在之后释放它?真的,我想我们需要为 numpy 中的每个锁执行此操作,这需要保留每个 RandomState 对象的弱集,_FFTCache似乎也有一个锁......

(从好的方面来说,这也让我们有机会重新初始化子项中的全局随机状态,在用户没有显式设定种子的情况下,我们确实应该这样做。

使用numpy.random.seed不是线程安全的。numpy.random.seed在全球范围内改变种子的价值,而据我所知,您正在尝试在本地更改种子。

查看文档

如果您确实要实现的是在每个工作线程开始时播种发电机,则以下解决方案:

def worker(n):
# Removing np.random.seed solves the problem                                                               
randgen = np.random.RandomState(45678) # RandomState, not seed!
# ...Do something with randgen...                                           
return 1234 # trivial return value                                                                         

使这是一个完整的答案,因为它不适合评论。

玩了一会儿后,这里的东西闻起来像一个numpy.random bug。我能够重现冻结错误,此外还有其他一些不应该发生的奇怪事情,例如手动播种生成器不工作。

def rand_seed(rand, i):
print(i)
np.random.seed(i)
print(i)
print(rand())
def test1():
with multiprocessing.Pool() as pool:
[pool.apply_async(rand_seed, (np.random.random_sample, i)).get()
for i in range(5)]
test1()

有输出

0
0
0.3205032737431185
1
1
0.3205032737431185
2
2
0.3205032737431185
3
3
0.3205032737431185
4
4
0.3205032737431185

另一方面,不传递np.random.random_sample作为参数就可以了。

def rand_seed2(i):
print(i)
np.random.seed(i)
print(i)
print(np.random.random_sample())
def test2():
with multiprocessing.Pool() as pool:
[pool.apply_async(rand_seed, (i,)).get()
for i in range(5)]
test2()

有输出

0
0
0.5488135039273248
1
1
0.417022004702574
2
2
0.43599490214200376
3
3
0.5507979025745755
4
4
0.9670298390136767

这表明幕后正在发生一些严重的愚蠢行为。不知道还有什么要说的....

基本上,numpy.random.seed似乎不仅修改了"种子状态"变量,还修改了random_sample函数本身。

相关内容

  • 没有找到相关文章

最新更新