如何与子进程共享父进程的 numpy 随机状态



我在程序开始时设置了numpy随机种子。在程序执行期间,我使用multiprocessing.Process多次运行函数。该函数使用 numpy 随机函数来绘制随机数。问题是Process获取当前环境的副本。因此,每个进程都是独立运行的,它们都以与父环境相同的随机种子启动。

所以我的问题是我如何与子进程环境共享父环境中 numpy 的随机状态?请注意,我想在工作中使用Process,并且需要使用单独的类并在该中单独执行import numpy。我尝试使用multiprocessing.Manager来共享随机状态,但似乎事情没有按预期工作,我总是得到相同的结果。另外,如果我将 for 循环移动到drawNumpySamples内部或将其保留在main.py中都没有关系;我仍然无法获得不同的数字,并且随机状态总是相同的。这是我代码的简化版本:

# randomClass.py
import numpy as np
class myClass(self):
def __init__(self, randomSt):
print ('setup the object')
np.random.set_state(randomSt)
def drawNumpySamples(self, idx)
np.random.uniform()

在主文件中:

# main.py
import numpy as np
from multiprocessing import Process, Manager
from randomClass import myClass
np.random.seed(1) # set random seed
mng = Manager()
randomState = mng.list(np.random.get_state())
myC = myClass(randomSt = randomState)
for i in range(10):
myC.drawNumpySamples() # this will always return the same results

注意:我使用Python 3.5。我还在Numpy的GitHub页面上发布了一个问题。只需在此处发送问题链接以供将来参考。

即使你设法让它工作,我认为它也不会做你想要的。一旦有多个进程并行地从相同的随机状态拉取,它们各自到达状态的顺序就不再是确定的,这意味着您的运行实际上将不可重复。可能有办法解决这个问题,但这似乎是一个不平凡的问题。

同时,有一个解决方案可以解决您想要的问题和非确定性问题:

在生成子进程之前,请向 RNG 请求一个随机数,并将其传递给子进程。然后,孩子可以用这个数字播种。然后,每个子项将具有与其他子项不同的随机序列,但如果您使用固定种子重新运行整个应用,则同一子项将获得相同的随机序列。

如果您的主进程执行任何其他可能不确定地依赖于子进程执行的 RNG 工作,则需要在拉取任何其他随机数之前按顺序为所有子进程预生成种子。


正如senderle在评论中指出的那样:如果你不需要多个不同的运行,而只需要一个固定的运行,你甚至不需要从你的种子RNG中提取种子;只需使用一个从1开始的计数器,并为每个新进程递增它,并将其用作种子。我不知道这是否可以接受,但如果可以接受,很难比这更简单。

正如 Amir 在评论中指出的那样:更好的方法是每次生成新进程时绘制一个随机整数,并将该随机整数传递给新进程,以使用该整数设置 numpy 的随机种子。这个整数确实可以来自np.random.randint().

每次获得随机数时,都需要更新Manager的状态:

import numpy as np
from multiprocessing import Manager, Pool, Lock
lock = Lock()
mng = Manager()
state = mng.list(np.random.get_state())
def get_random(_):
with lock:
np.random.set_state(state)
result = np.random.uniform()
state[:] = np.random.get_state()
return result
np.random.seed(1)
result1 = Pool(10).map(get_random, range(10))
# Compare with non-parallel version
np.random.seed(1)
result2 = [np.random.uniform() for _ in range(10)]
# result of Pool.map may be in different order
assert sorted(result1) == sorted(result2)

幸运的是,根据文档,您可以使用get_state访问 numpy 随机数生成器的完整状态,并使用set_state再次设置它。 生成器本身使用 Mersenne Twister 算法(请参阅文档的RandomState部分)。

这意味着你可以做任何你想做的事情,尽管它是否会是好的有效的完全是一个不同的问题。 正如abarnert指出的那样,无论你如何共享父级的状态——这可以使用Alex Hall的方法,这种方法看起来是正确的——你在每个子级中的排序将取决于每个子级从MT状态机中抽取随机数的顺序。

也许最好为每个孩子构建一个大型伪随机数池,在开始时保存整个生成器的启动状态一次。 然后,每个子级都可以绘制一个 PRNG 值,直到其特定池用完,之后您可以让子级与父级协调下一个池。 父级枚举哪些孩子获得了哪个"池"号码。 代码看起来像这样(请注意,使用next方法将其转换为无限生成器是有意义的):

class PrngPool(object):
def __init__(self, child_id, shared_state):
self._child_id = child_id
self._shared_state = shared_state
self._numbers = []
def next_number(self):
if not self.numbers:
self._refill()
return self.numbers.pop(0)  # XXX inefficient
def _refill(self):
# ... something like Alex Hall's lock/gen/unlock,
# but fill up self._numbers with the next 1000 (or
# however many) numbers after adding our ID and
# the index "n" of which n-through-n+999 numbers
# we took here.  Any other child also doing a
# _refill will wait for the lock and get an updated
# index n -- eg, if we got numbers 3000 to 3999,
# the next child will get numbers 4000 to 4999.

这样,通过管理器项(MT 状态和我们的 ID 和索引添加到"已使用"列表中)的通信就没有那么多了。 在该过程结束时,可以看到哪些子使用了哪些 PRNG 值,并在需要时重新生成这些 PRNG 值(请记住记录完整的 MT 内部启动状态!

编辑添加:思考这个问题的方式是这样的:机器翻译实际上不是随机的。 它是周期性的,周期很长。 当您使用任何此类 RNG 时,您的种子只是该时间段内的一个起点。 要获得可重复性,您必须使用随机数,例如书中的一组。 有一本(虚拟)书,其中包含来自 MT 生成器的每个数字。 我们将写下本书中用于每组计算的哪一页,以便我们稍后可以重新打开这本书并重新进行相同的计算。

您可以使用np.random.SeedSequence.请参阅 https://numpy.org/doc/stable/reference/random/parallel.html:

from numpy.random import SeedSequence, default_rng
ss = SeedSequence(12345)
# Spawn off 10 child SeedSequences to pass to child processes.
child_seeds = ss.spawn(10)
streams = [default_rng(s) for s in child_seeds]

这样,你们每个线程/进程都将获得一个统计上独立的随机生成器。

最新更新