我实现了以下代码:
import numpy as np
import time
import multiprocessing as mp
class RandomStater():
def __init__(self, num_per_chunks=1e7, seed = 42):
np.random.RandomState(seed)
self.num_per_chunks = int(num_per_chunks)
self.d = {}
self.i = {}
self.random_numbers = {}
def _choice_n(self, n):
if self.i[n] % self.num_per_chunks == 0:
self.random_numbers[n] = np.random.choice(n, size=self.num_per_chunks)
self.i[n] += 1
return self.random_numbers[n][self.i[n]]
def choice(self, actions, p=None):
num_actions = len(actions)
if num_actions not in self.d:
print('Creating action', actions)
self.i[num_actions] = 0
self.d[num_actions] = self._choice_n
return actions[self.d[num_actions](num_actions)]
class SampleNumber():
def __init__(self):
self._random_start = RandomStater()
def sample_number(self, _):
return self._random_start.choice([1,2,3])
基本上,类RandomStater
是方法的批处理版本np.random.choice
,它预先计算了大量随机样本以供下一步使用。第二个类是测试类,用于按顺序收集这些随机样本。
我想在不同的进程之间并行使用该方法sample_number
。应该发生的是,self.d[num_actions]
和self.i[num_actions]
的值仅在第一个进程第一次执行方法choice
时创建,而对于所有其他调用,计数器self.i[num_actions]
递增,结果非常快地返回。
我想该类应该在进程或其他东西之间共享,但我是多处理的新手,并且不知道如何做到这一点。
到目前为止,我尝试了以下方法,但没有任何运气:
ns = range(0, 2000)
rn = RandomStater()
sn = SampleNumber()
s = time.time()
p = mp.Pool(4)
got = p.map(sn.sample_number, ns)
print(got)
print(time.time() - s)
多线程和并行随机数生成之间存在差异。
像您在问题中拥有的代码并不是为了允许多个进程(线程(并行访问随机数而设计的,因为它是为了减少生成一批连续随机数所需的时间,因为多个线程同时生成它们。 这是多线程随机数生成。(另请参阅 NumPy 文档中的示例代码。
对于您的目的(即并行随机数生成(的更好方法是为每个进程提供自己的 PRNG,并根据公共种子初始化这些 PRNG。 这在我的部分"播种多个进程"和特定于 NumPy 的页面"并行随机数生成"中有详细解释。
另外,请注意,numpy.random.RandomState
现在是 NumPy 1.17 的遗留类;NumPy 1.17引入了一种新的随机数生成系统,该系统使用所谓的位生成器(例如PCG(和随机生成器(例如新的numpy.random.Generator
(。 这是改变RNG政策的提案的结果。