我们如何在Julia中并行生成随机数



我正在为蒙特卡罗模拟编写一个并行的julia代码。这需要我在不同的内核上并行生成随机数。在我的工作站上的一个简单测试代码中,我尝试在4个核心上生成随机数,得到了以下结果:

julia -p 4
julia> @everywhere using Random
julia> @everywhere x = randn(1)
julia> remotecall_fetch(println,1,x[1])
-1.9348951407543997
julia> remotecall_fetch(println,2,x[1])
From worker 2:    -1.9348951407543997
julia> remotecall_fetch(println,3,x[1])
From worker 3:    -1.9348951407543997
julia> remotecall_fetch(println,4,x[1])
From worker 4:    -1.9348951407543997

我不明白为什么从不同的过程中提取的数字会给出完全相同的结果。我不确定错误是什么。我的理解是,使用@everywhere宏可以让你在所有进程上并行运行同一段代码。我现在的电脑是julia 1.6.0。非常感谢。

更新:感谢您的回复。基本上,我要找的是一个赋值语句,比如x=y,其中x和y都是工作进程的本地语句。我试过这样的东西:

julia -p 4
@sync @distributed for i = 1:2
x = randn(1)
println(x)
end
From worker 3:    [0.4451131733445428]
From worker 2:    [-0.4875627629008678]
Task (done) @0x00007f1d92037340
julia> remotecall_fetch(println,2,x)
ERROR: UndefVarError: x not defined
Stacktrace:
[1] top-level scope
@ REPL[23]:1

这似乎在每个过程中独立生成随机数。但是,我不知道如何访问变量x了。我尝试了remotecall_fetch(println, 2,x),但变量x似乎没有在工作进程上定义。这让人非常困惑。

我希望有一个好的流程图或好的文档来解释并行计算过程中Julia中变量和表达式的范围。

remotecall_fetch从本地进程(id为1(发送x[1]进行评估。您可以通过运行以下代码进行检查:

# julia -p 4
julia> @everywhere x = myid() # make sure x holds a worker number
julia> remotecall_fetch(println, 4, x) # x passed from worker 1 (local machine) to println
From worker 4:    1
julia> @sync @everywhere println(x) # x is evaluated on worker
1
From worker 3:    3
From worker 2:    2
From worker 4:    4
From worker 5:    5
julia> @sync @everywhere println($x) # x interpolated from local machine
1
From worker 4:    1
From worker 5:    1
From worker 3:    1
From worker 2:    1

关于远程机器上的随机数生成,您应该确保在每个机器上创建独立的随机数流。对于大多数情况来说,最简单的方法就是在不同的工人身上使用具有不同种子的Random.seed!函数。如果您想格外小心,请使用Future.randjump来确保工作进程上的随机数生成器没有重叠。

最好的方法是拥有一个随机状态,并将其分割为多个部分,其中每个工作者都有一个部分。可以这样做:

using Distributed
addprocs(4)
@everywhere import Future, Random
@everywhere const rng = Future.randjump(Random.MersenneTwister(0), myid()*big(10)^20)

现在,每个工作者都有一个局部工作者,但在Julia意义上是全局rng变量。

在这个例子中,我使用了0作为随机数种子。手册中建议使用big(10)^20randjump大小,因为在Julia中,此步骤已经预先计算了值。

要使用这样的rng,您可以定义一个函数,例如:

@everywhere getr(rng=rng) = rand(rng, 5)

可以称为

fetch(@spawnat 2 getr())

基本上是rng,因为它是global,所以它应该作为最外层的参数传递给您在远程工作者上调用的任何对象,或者定义为const,如注释中所述。

你在问题的顶部说"在不同的核心上";。这是我使用Threads.@thread宏在基于线程的并行性上生成随机数的函数所使用的解决方案。它的优点是,我可以根据运行代码时的需要,选择我需要的随机性级别:

using Random, Test, Statistics
FIXEDRNG = MersenneTwister(123)
println("** Testing generateParallelRngs()...")
x = rand(copy(FIXEDRNG),100)
function innerExpensiveFunction(bootstrappedx; rng=Random.GLOBAL_RNG)
sum(bootstrappedx .* rand(rng) ./ 0.5)
end
function outerFunction(x;rng = Random.GLOBAL_RNG)
masterSeed = rand(rng,100:9999999999999) 
rngs       = [deepcopy(rng) for i in 1:Threads.nthreads()]  # make new copy instances
results    = Array{Float64,1}(undef,30)
Threads.@threads for i in 1:30
tsrng         = rngs[Threads.threadid()]    # Thread safe random number generator: one RNG per thread
Random.seed!(tsrng,masterSeed+i*10)         # But the seeding depends on the i of the loop not the thread: we get same results indipendently of the number of threads
toSample      = rand(tsrng, 1:100,100)
bootstrappedx = x[toSample]
innerResult   = innerExpensiveFunction(bootstrappedx, rng=tsrng)
results[i]    = innerResult
end
overallResult = mean(results)
return overallResult
end

# Different sequences..
@test outerFunction(x) != outerFunction(x)
# Different values, but same sequence
mainRng = copy(FIXEDRNG)
a = outerFunction(x, rng=mainRng)
b = outerFunction(x, rng=mainRng)
mainRng = copy(FIXEDRNG)
A = outerFunction(x, rng=mainRng)
B = outerFunction(x, rng=mainRng)
@test a != b && a == A && b == B

# Same value at each call
a = outerFunction(x,rng=copy(FIXEDRNG))
b = outerFunction(x,rng=copy(FIXEDRNG))
@test a == b

最新更新