在 Julia 中使用并行处理填充矩阵



我正在尝试通过并行处理加快 Julia (v. 0.5.0( 中动态编程问题的求解时间。该问题涉及在每次迭代中为 1073 x 19 矩阵的每个元素选择最佳值,直到连续矩阵差异落在容差范围内。我认为,在每次迭代中,填充矩阵每个元素的值可以并行化。但是,我看到使用SharedArray时性能大幅下降,我想知道是否有更好的方法来解决此问题的并行处理。

我为下面的函数构造参数:

est_params = [.788,.288,.0034,.1519,.1615,.0041,.0077,.2,0.005,.7196]
r          = 0.015
tau        = 0.35
rho        =est_params[1]
sigma      =est_params[2]
delta      = 0.15
gamma      =est_params[3]
a_capital  =est_params[4]
lambda1    =est_params[5]
lambda2    =est_params[6]
s          =est_params[7]
theta      =est_params[8]
mu         =est_params[9]
p_bar_k_ss  =est_params[10]
beta  = (1+r)^(-1)
sigma_range = 4
gz = 19 
gp = 29 
gk = 37 
lnz=collect(linspace(-sigma_range*sigma,sigma_range*sigma,gz))
z=exp(lnz)
gk_m = fld(gk,2)
# Need to add mu somewhere to k_ss
k_ss =  (theta*(1-tau)/(r+delta))^(1/(1-theta))
k=cat(1,map(i->k_ss*((1-delta)^i),collect(1:gk_m)),map(i->k_ss/((1-delta)^i),collect(1:gk_m)))
insert!(k,gk_m+1,k_ss)
sort!(k)
p_bar=p_bar_k_ss*k_ss
p = collect(linspace(-p_bar/2,p_bar,gp))
#Tauchen
N     = length(z)
Z     = zeros(N,1)
Zprob = zeros(Float32,N,N)
Z[N]  = lnz[length(z)]
Z[1]  = lnz[1]
zstep = (Z[N] - Z[1]) / (N - 1)
for i=2:(N-1)
Z[i] = Z[1] + zstep * (i - 1)
end
for a = 1 : N
for b = 1 : N
if b == 1
Zprob[a,b] = 0.5*erfc(-((Z[1] - mu - rho * Z[a] + zstep / 2) / sigma)/sqrt(2))
elseif b == N
Zprob[a,b] = 1 - 0.5*erfc(-((Z[N] - mu - rho * Z[a] - zstep / 2) / sigma)/sqrt(2))
else
Zprob[a,b] = 0.5*erfc(-((Z[b] - mu - rho * Z[a] + zstep / 2) / sigma)/sqrt(2)) -
0.5*erfc(-((Z[b] - mu - rho * Z[a] - zstep / 2) / sigma)/sqrt(2))
end
end
end
# Collecting tauchen results in a 2 element array of linspace and array; [2] gets array
# Zprob=collect(tauchen(gz, rho, sigma, mu, sigma_range))[2]
Zcumprob=zeros(Float32,gz,gz)
# 2 in cumsum! denotes the 2nd dimension, i.e. columns
cumsum!(Zcumprob, Zprob,2)

gm = gk * gp
control=zeros(gm,2)
for i=1:gk
control[(1+gp*(i-1)):(gp*i),1]=fill(k[i],(gp,1))
control[(1+gp*(i-1)):(gp*i),2]=p
end
endog=copy(control)
E=Array(Float32,gm,gm,gz)
for h=1:gm
for m=1:gm
for j=1:gz
# set the nonzero net debt indicator
if endog[h,2]<0
p_ind=1
else
p_ind=0
end
# set the investment indicator
if (control[m,1]-(1-delta)*endog[h,1])!=0
i_ind=1
else
i_ind=0
end
E[m,h,j] = (1-tau)*z[j]*(endog[h,1]^theta) + control[m,2]-endog[h,2]*(1+r*(1-tau))  +
delta*endog[h,1]*tau-(control[m,1]-(1-delta)*endog[h,1]) -
(i_ind*gamma*endog[h,1]+endog[h,1]*(a_capital/2)*(((control[m,1]-(1-delta)*endog[h,1])/endog[h,1])^2)) +
s*endog[h,2]*p_ind
elem = E[m,h,j]
if E[m,h,j]<0
E[m,h,j]=elem+lambda1*elem-.5*lambda2*elem^2
else
E[m,h,j]=elem
end
end
end
end

然后,我用串行处理构造了函数。两个for循环遍历每个元素,以查找 1072 大小(=函数中gm标量参数(数组中的最大值:

function dynam_serial(E,gm,gz,beta,Zprob)
v           = Array(Float32,gm,gz )
fill!(v,E[cld(gm,2),cld(gm,2),cld(gz,2)])
Tv          = Array(Float32,gm,gz)
# Set parameters for the loop
convcrit = 0.0001   # chosen convergence criterion
diff = 1          # arbitrary initial value greater than convcrit
while diff>convcrit
exp_v=v*Zprob'
for h=1:gm
for j=1:gz
Tv[h,j]=findmax(E[:,h,j] + beta*exp_v[:,j])[1]
end
end
diff = maxabs(Tv - v)
v=copy(Tv)
end
end

计时,我得到:

@time dynam_serial(E,gm,gz,beta,Zprob)
> 106.880008 seconds (91.70 M allocations: 203.233 GB, 15.22% gc time)

现在,我尝试使用共享阵列从并行处理中受益。请注意,我重新配置了迭代,以便只有一个for循环,而不是两个。我也使用v=deepcopy(Tv);否则,v被复制为Array对象,而不是SharedArray

function dynam_parallel(E,gm,gz,beta,Zprob)
v           = SharedArray(Float32,(gm,gz),init = S -> S[Base.localindexes(S)] = myid() )
fill!(v,E[cld(gm,2),cld(gm,2),cld(gz,2)])
# Set parameters for the loop
convcrit = 0.0001   # chosen convergence criterion
diff = 1          # arbitrary initial value greater than convcrit
while diff>convcrit
exp_v=v*Zprob'
Tv          = SharedArray(Float32,gm,gz,init = S -> S[Base.localindexes(S)] = myid() )
@sync @parallel for hj=1:(gm*gz)
j=cld(hj,gm)
h=mod(hj,gm)
if h==0;h=gm;end;
@async Tv[h,j]=findmax(E[:,h,j] + beta*exp_v[:,j])[1]
end
diff = maxabs(Tv - v)
v=deepcopy(Tv)
end
end

计时并行版本;并使用具有16GB内存的4核2.5 GHz I7处理器,我得到:

addprocs(3)
@time dynam_parallel(E,gm,gz,beta,Zprob)
> 164.237208 seconds (2.64 M allocations: 201.812 MB, 0.04% gc time)

我在这里做错了什么吗?或者有没有更好的方法来处理 Julia 中的并行处理这个特定问题?我已经考虑过使用分布式阵列,但我很难看到如何将它们应用于当前的问题。

更新:根据@DanGetz和他的有用评论,我转而尝试加快串行处理版本。我能够通过以下方式将性能降低到53.469780 seconds (67.36 M allocations: 103.419 GiB, 19.12% gc time)

1(升级到0.6.0(节省约25秒(,其中包括有用的@views宏。

2( 根据 Julia 性能提示中关于预分配输出的部分,预分配我正在尝试填写的主数组 (Tv(,https://docs.julialang.org/en/latest/manual/performance-tips/。(又节省了25秒左右(

剩下的最大的减速似乎来自add_vecs函数,它将两个较大矩阵的子数组相加。我尝试过去矢量化和使用 BLAS 函数,但无法产生更好的性能。

无论如何,改进的dynam_serial代码如下:

function add_vecs(r::Array{Float32},h::Int,j::Int,E::Array{Float32},exp_v::Array{Float32},beta::Float32)
@views r=E[:,h,j] + beta*exp_v[:,j]
return r
end

function dynam_serial(E::Array{Float32},gm::Int,gz::Int,beta::Float32,Zprob::Array{Float32})
v           = Array{Float32}(gm,gz)
fill!(v,E[cld(gm,2),cld(gm,2),cld(gz,2)])
Tv          = Array{Float32}(gm,gz)
r           = Array{Float32}(gm)
# Set parameters for the loop
convcrit = 0.0001   # chosen convergence criterion
diff = 1          # arbitrary initial value greater than convcrit
while diff>convcrit
exp_v=v*Zprob'
for h=1:gm
for j=1:gz
@views Tv[h,j]=findmax(add_vecs(r,h,j,E,exp_v,beta))[1]
end
end
diff = maximum(abs,Tv - v)
v=copy(Tv)
end
return Tv
end

如果add_vecs似乎是关键函数,那么编写显式for循环可以提供更多优化。以下基准测试如何:

function add_vecs!(r::Array{Float32},h::Int,j::Int,E::Array{Float32},
exp_v::Array{Float32},beta::Float32)
@inbounds for i=1:size(E,1)
r[i]=E[i,h,j] + beta*exp_v[i,j]
end
return r
end

更新

为了继续优化dynam_serial我尝试删除更多分配。结果是:

function add_vecs_and_max!(gm::Int,r::Array{Float64},h::Int,j::Int,E::Array{Float64},
exp_v::Array{Float64},beta::Float64)
@inbounds for i=1:gm            
r[i] = E[i,h,j]+beta*exp_v[i,j]
end
return findmax(r)[1]
end
function dynam_serial(E::Array{Float64},gm::Int,gz::Int,
beta::Float64,Zprob::Array{Float64})
v           = Array{Float64}(gm,gz)
fill!(v,E[cld(gm,2),cld(gm,2),cld(gz,2)])
r           = Array{Float64}(gm)
exp_v       = Array{Float64}(gm,gz)
# Set parameters for the loop
convcrit = 0.0001   # chosen convergence criterion
diff = 1.0 # arbitrary initial value greater than convcrit
while diff>convcrit
A_mul_Bt!(exp_v,v,Zprob)
diff = -Inf
for h=1:gm
for j=1:gz
oldv = v[h,j]
newv = add_vecs_and_max!(gm,r,h,j,E,exp_v,beta)
v[h,j]= newv
diff = max(diff, oldv-newv, newv-oldv)
end
end
end
return v
end

切换函数以使用 Float64 应该会提高速度(因为 CPU 本质上针对 64 位字长进行了优化(。此外,使用变异A_mul_Bt!直接保存另一个分配。通过切换数组vTv来避免copy(...)

这些优化如何改善您的运行时间?

第二次更新

更新了"更新"部分中的代码以使用findmax。此外,将dynam_serial更改为使用没有Tvv,因为除了diff计算之外,不需要保存旧版本,现在在循环内完成。

这是我复制粘贴的代码,由上面的Dan Getz提供。我完全按照我运行它们的方式包含数组和标量定义。性能为:运行@time dynam_serial(E,gm,gz,beta,Zprob)39.507005 seconds (11 allocations: 486.891 KiB)

using SpecialFunctions
est_params = [.788,.288,.0034,.1519,.1615,.0041,.0077,.2,0.005,.7196]
r          = 0.015
tau        = 0.35
rho        =est_params[1]
sigma      =est_params[2]
delta      = 0.15
gamma      =est_params[3]
a_capital  =est_params[4]
lambda1    =est_params[5]
lambda2    =est_params[6]
s          =est_params[7]
theta      =est_params[8]
mu         =est_params[9]
p_bar_k_ss  =est_params[10]
beta  = (1+r)^(-1)
sigma_range = 4
gz = 19 #15 #19
gp = 29 #19 #29
gk = 37 #25 #37
lnz=collect(linspace(-sigma_range*sigma,sigma_range*sigma,gz))
z=exp.(lnz)
gk_m = fld(gk,2)
# Need to add mu somewhere to k_ss
k_ss =  (theta*(1-tau)/(r+delta))^(1/(1-theta))
k=cat(1,map(i->k_ss*((1-delta)^i),collect(1:gk_m)),map(i->k_ss/((1-delta)^i),collect(1:gk_m)))
insert!(k,gk_m+1,k_ss)
sort!(k)
p_bar=p_bar_k_ss*k_ss
p = collect(linspace(-p_bar/2,p_bar,gp))
#Tauchen
N     = length(z)
Z     = zeros(N,1)
Zprob = zeros(Float64,N,N)
Z[N]  = lnz[length(z)]
Z[1]  = lnz[1]
zstep = (Z[N] - Z[1]) / (N - 1)
for i=2:(N-1)
Z[i] = Z[1] + zstep * (i - 1)
end
for a = 1 : N
for b = 1 : N
if b == 1
Zprob[a,b] = 0.5*erfc(-((Z[1] - mu - rho * Z[a] + zstep / 2) / sigma)/sqrt(2))
elseif b == N
Zprob[a,b] = 1 - 0.5*erfc(-((Z[N] - mu - rho * Z[a] - zstep / 2) / sigma)/sqrt(2))
else
Zprob[a,b] = 0.5*erfc(-((Z[b] - mu - rho * Z[a] + zstep / 2) / sigma)/sqrt(2)) -
0.5*erfc(-((Z[b] - mu - rho * Z[a] - zstep / 2) / sigma)/sqrt(2))
end
end
end
# Collecting tauchen results in a 2 element array of linspace and array; [2] gets array
# Zprob=collect(tauchen(gz, rho, sigma, mu, sigma_range))[2]
Zcumprob=zeros(Float64,gz,gz)
# 2 in cumsum! denotes the 2nd dimension, i.e. columns
cumsum!(Zcumprob, Zprob,2)

gm = gk * gp
control=zeros(gm,2)
for i=1:gk
control[(1+gp*(i-1)):(gp*i),1]=fill(k[i],(gp,1))
control[(1+gp*(i-1)):(gp*i),2]=p
end
endog=copy(control)
E=Array(Float64,gm,gm,gz)
for h=1:gm
for m=1:gm
for j=1:gz
# set the nonzero net debt indicator
if endog[h,2]<0
p_ind=1
else
p_ind=0
end
# set the investment indicator
if (control[m,1]-(1-delta)*endog[h,1])!=0
i_ind=1
else
i_ind=0
end
E[m,h,j] = (1-tau)*z[j]*(endog[h,1]^theta) + control[m,2]-endog[h,2]*(1+r*(1-tau))  +
delta*endog[h,1]*tau-(control[m,1]-(1-delta)*endog[h,1]) -
(i_ind*gamma*endog[h,1]+endog[h,1]*(a_capital/2)*(((control[m,1]-(1-delta)*endog[h,1])/endog[h,1])^2)) +
s*endog[h,2]*p_ind
elem = E[m,h,j]
if E[m,h,j]<0
E[m,h,j]=elem+lambda1*elem-.5*lambda2*elem^2
else
E[m,h,j]=elem
end
end
end
end
function add_vecs_and_max!(gm::Int,r::Array{Float64},h::Int,j::Int,E::Array{Float64},
exp_v::Array{Float64},beta::Float64)
maxr = -Inf
@inbounds for i=1:gm            r[i] = E[i,h,j]+beta*exp_v[i,j]
maxr = max(r[i],maxr)
end
return maxr
end
function dynam_serial(E::Array{Float64},gm::Int,gz::Int,
beta::Float64,Zprob::Array{Float64})
v           = Array{Float64}(gm,gz)
fill!(v,E[cld(gm,2),cld(gm,2),cld(gz,2)])
Tv          = Array{Float64}(gm,gz)
r           = Array{Float64}(gm)
exp_v       = Array{Float64}(gm,gz)
# Set parameters for the loop
convcrit = 0.0001   # chosen convergence criterion
diff = 1.0 # arbitrary initial value greater than convcrit
while diff>convcrit
A_mul_Bt!(exp_v,v,Zprob)
diff = -Inf
for h=1:gm
for j=1:gz
Tv[h,j]=add_vecs_and_max!(gm,r,h,j,E,exp_v,beta)
diff = max(abs(Tv[h,j]-v[h,j]),diff)
end
end
(v,Tv)=(Tv,v)
end
return v
end

现在,这是算法和输入的另一个版本。这些函数类似于 Dan Getz 的建议,只是我使用findmax而不是迭代的max函数来查找数组最大值。在输入构造中,我同时使用Float32并将不同的位类型混合在一起。但是,我一直以这种方式获得更好的性能:24.905569 seconds (1.81 k allocations: 46.829 MiB, 0.01% gc time).但完全不清楚为什么。

using SpecialFunctions
est_params = [.788,.288,.0034,.1519,.1615,.0041,.0077,.2,0.005,.7196]
r          = 0.015
tau        = 0.35
rho        =est_params[1]
sigma      =est_params[2]
delta      = 0.15
gamma      =est_params[3]
a_capital  =est_params[4]
lambda1    =est_params[5]
lambda2    =est_params[6]
s          =est_params[7]
theta      =est_params[8]
mu         =est_params[9]
p_bar_k_ss  =est_params[10]
beta  = Float32((1+r)^(-1))
sigma_range = 4
gz = 19
gp = 29
gk = 37
lnz=collect(linspace(-sigma_range*sigma,sigma_range*sigma,gz))
z=exp(lnz)
gk_m = fld(gk,2)
# Need to add mu somewhere to k_ss
k_ss =  (theta*(1-tau)/(r+delta))^(1/(1-theta))
k=cat(1,map(i->k_ss*((1-delta)^i),collect(1:gk_m)),map(i->k_ss/((1-delta)^i),collect(1:gk_m)))
insert!(k,gk_m+1,k_ss)
sort!(k)
p_bar=p_bar_k_ss*k_ss
p = collect(linspace(-p_bar/2,p_bar,gp))
#Tauchen
N     = length(z)
Z     = zeros(N,1)
Zprob = zeros(Float32,N,N)
Z[N]  = lnz[length(z)]
Z[1]  = lnz[1]
zstep = (Z[N] - Z[1]) / (N - 1)
for i=2:(N-1)
Z[i] = Z[1] + zstep * (i - 1)
end
for a = 1 : N
for b = 1 : N
if b == 1
Zprob[a,b] = 0.5*erfc(-((Z[1] - mu - rho * Z[a] + zstep / 2) / sigma)/sqrt(2))
elseif b == N
Zprob[a,b] = 1 - 0.5*erfc(-((Z[N] - mu - rho * Z[a] - zstep / 2) / sigma)/sqrt(2))
else
Zprob[a,b] = 0.5*erfc(-((Z[b] - mu - rho * Z[a] + zstep / 2) / sigma)/sqrt(2)) -
0.5*erfc(-((Z[b] - mu - rho * Z[a] - zstep / 2) / sigma)/sqrt(2))
end
end
end
# Collecting tauchen results in a 2 element array of linspace and array; [2] gets array
# Zprob=collect(tauchen(gz, rho, sigma, mu, sigma_range))[2]
Zcumprob=zeros(Float32,gz,gz)
# 2 in cumsum! denotes the 2nd dimension, i.e. columns
cumsum!(Zcumprob, Zprob,2)

gm = gk * gp
control=zeros(gm,2)
for i=1:gk
control[(1+gp*(i-1)):(gp*i),1]=fill(k[i],(gp,1))
control[(1+gp*(i-1)):(gp*i),2]=p
end
endog=copy(control)
E=Array(Float32,gm,gm,gz)
for h=1:gm
for m=1:gm
for j=1:gz
# set the nonzero net debt indicator
if endog[h,2]<0
p_ind=1
else
p_ind=0
end
# set the investment indicator
if (control[m,1]-(1-delta)*endog[h,1])!=0
i_ind=1
else
i_ind=0
end
E[m,h,j] = (1-tau)*z[j]*(endog[h,1]^theta) + control[m,2]-endog[h,2]*(1+r*(1-tau))  +
delta*endog[h,1]*tau-(control[m,1]-(1-delta)*endog[h,1]) -
(i_ind*gamma*endog[h,1]+endog[h,1]*(a_capital/2)*(((control[m,1]-(1-delta)*endog[h,1])/endog[h,1])^2)) +
s*endog[h,2]*p_ind
elem = E[m,h,j]
if E[m,h,j]<0
E[m,h,j]=elem+lambda1*elem-.5*lambda2*elem^2
else
E[m,h,j]=elem
end
end
end
end
function add_vecs!(gm::Int,r::Array{Float32},h::Int,j::Int,E::Array{Float32},
exp_v::Array{Float32},beta::Float32)
@inbounds @views for i=1:gm
r[i]=E[i,h,j] + beta*exp_v[i,j]
end
return r
end
function dynam_serial(E::Array{Float32},gm::Int,gz::Int,beta::Float32,Zprob::Array{Float32})
v           = Array{Float32}(gm,gz)
fill!(v,E[cld(gm,2),cld(gm,2),cld(gz,2)])
Tv          = Array{Float32}(gm,gz)
# Set parameters for the loop
convcrit = 0.0001   # chosen convergence criterion
diff = 1.00000          # arbitrary initial value greater than convcrit
iter=0
exp_v=Array{Float32}(gm,gz)
r=Array{Float32}(gm)
while diff>convcrit

A_mul_Bt!(exp_v,v,Zprob)
for h=1:gm
for j=1:gz
Tv[h,j]=findmax(add_vecs!(gm,r,h,j,E,exp_v,beta))[1]
end
end
diff = maximum(abs,Tv - v)
(v,Tv)=(Tv,v)
end
return v
end

最新更新