上下文:
我有3个3D阵列("precursor
阵列"),我正在用反向距离加权方法对其进行上采样。为此,我计算了一个3Dweights
数组,在precursor
数组的每个点上的for循环中使用该数组。
我的weights
阵列的每个2D切片用于计算partial
阵列。一旦我生成了所有28个,它们就会相加,得到一个最终的host
数组。
为了减少我的计算时间,我想将这个循环并行化。我尝试过这样做,但无法正确更新host
阵列。
问题:
我如何将我的主函数(代码的最后一部分)并行化?
编辑:或者有没有办法让我";切片";我的循环i
(例如,一个核心在i=0到5之间运行,一个内核在i=6到9上运行)?
摘要:
- 3个
precursor
阵列(温度、降水量、雪):10x4x7(10
是时间维度) - 1个
weight
阵列(w):28x1101x2101 - 28x3
partial
阵列:1101x2101 - 3个
host
阵列(temp、prec、Eprec):1101x2101
这是我的代码(除了MAIN ALGORITHM PARALLEL
部分之外,它是可运行的,请参阅末尾的MAIN ALGORITHM NOT PARALLEL
部分了解我的代码的非并行版本):
import numpy as np
import multiprocessing as mp
import time
#%% ------ Create data ------ ###
temperatures = np.random.rand(10,4,7)*100
precipitation = np.random.rand(10,4,7)
snow = np.random.rand(10,4,7)
# Array of altitudes to "adjust" the temperatures
alt = np.random.rand(4,7)*1000
#%% ------ Functions to run in parallel ------ ###
# This function upsamples the precursor arrays and creates the partial arrays
def interpolator(i, k, mx, my):
T = ((temperatures[i,mx,my]-272.15) + (-alt[mx, my] * -6/1000)) * w[k,:,:]
P = (precipitation[i,mx,my])*w[k,:,:]
S = (snow[i,mx,my])*w[k,:,:]
return(T, P, S)
# We add each partial array to each other to create the host array
def get_results(results):
global temp, prec, Eprec
temp += results[0]
prec += results[1]
Eprec += results[2]
#%% ------ IDW Interpolation ------ ###
# We create a weight matrix that we use to upsample our temperatures, precipitations and snow matrices
# This part is not that important, it works well as it is
MX,MY = np.shape(temperatures[0])
N = 300
T = np.zeros([N*MX+1, N*MY+1])
# create NxM inverse distance weight matrices based on Gaussian interpolation
x = np.arange(0,N*MX+1)
y = np.arange(0,N*MY+1)
X,Y = np.meshgrid(x,y)
k = 0
w = np.zeros([MX*MY,N*MX+1,N*MY+1])
for mx in range(MX):
for my in range(MY):
# Gaussian
add_point = np.exp(-((mx*N-X.T)**2+(my*N-Y.T)**2)/N**2)
w[k,:,:] += add_point
k += 1
sum_weights = np.sum(w, axis=0)
for k in range(MX*MY):
w[k,:,:] /= sum_weights
#%% ------ MAIN ALGORITHM PARALLEL ------ ###
if __name__ == '__main__':
# Create an empty array to use as a template
dummy = np.zeros((w.shape[1], w.shape[2]))
# Start a timer
ts = time.time()
# Iterate over the time dimension
for i in range(temperatures.shape[0]):
# Initialize the host arrays
temp = dummy.copy()
prec = dummy.copy()
Eprec = dummy.copy()
# Create the pool based on my amount of cores
pool = mp.Pool(mp.cpu_count())
# Loop through every weight slice, for every cell of the temperatures, precipitations and snow arrays
for k in range(0,w.shape[0]):
for mx in range(MX):
for my in range(MY):
# Upsample the temperatures, precipitations and snow arrays by adding the contribution of each weight slice
pool.apply_async(interpolator, args = (i, k, mx, my), callback = get_results)
pool.close()
pool.join()
# Print the time spent on the loop
print("Time spent: ", time.time()-ts)
#%% ------ MAIN ALGORITHM NOT PARALLEL ------ ###
if __name__ == '__main__':
# Create an empty array to use as a template
dummy = np.zeros((w.shape[1], w.shape[2]))
ts = time.time()
for i in range(temperatures.shape[0]):
# Create empty host arrays
temp = dummy.copy()
prec = dummy.copy()
Eprec = dummy.copy()
k = 0
for mx in range(MX):
for my in range(MY):
get_results(interpolator(i, k, mx, my))
k += 1
print("Time spent:", time.time()-ts)
多处理的问题是它会创建许多新进程,这些进程在主之前(即if __name__ == '__main__'
之前)执行代码。这导致初始化非常缓慢(因为所有进程都这样做),并且大量的RAM被白白使用。当然,你应该在主函数中移动所有内容,或者如果可能的话,在函数中移动一切(这通常会导致更快的执行,并且无论如何都是一个很好的软件工程实践,尤其是对于并行代码)。即便如此,多处理还有另一个巨大的问题:进程间通信很慢。一种解决方案是使用多线程方法,这是通过使用Numba或Cython实现的(您可以使用它们禁用GIL,而不是使用基本的CPython线程)。事实上,它们通常比多处理更易于使用。然而,您应该更加小心,因为并行访问是不受保护的,并且数据竞争可能出现在伪造的并行代码中。
在您的情况下,计算主要是内存限制。这意味着多处理非常无用。事实上,并行性在这里几乎没有用处,除非您在具有高吞吐量的计算服务器上运行此代码。事实上,内存是一种共享资源,使用更多的计算核心并没有多大帮助,因为一个核心几乎会使普通PC上的内存带宽饱和(而计算服务器上只需要很少的核心)。
加快内存绑定代码的关键是避免创建临时数组,并使用缓存友好的算法。在您的情况下,填充T
、P
和S
只是为了稍后读取,以便更新temp
、prec
和Eprec
阵列。这个临时步骤在这里是非常昂贵和必要的(尤其是填充数组)。删除此项将增加算术强度,从而生成一个顺序更快的代码,并且可以在多个内核上更好地缩放。我的机器就是这样。
以下是使用Numba使代码并行化的代码示例:
import numba as nb
# get_results + interpolator
@nb.njit('void(float64[:,::1], float64[:,::1], float64[:,::1], float64[:,:,::1], int_, int_, int_, int_)', parallel=True)
def interpolate_and_get_results(temp, prec, Eprec, w, i, k, mx, my):
factor1 = ((temperatures[i,mx,my]-272.15) + (-alt[mx, my] * -6/1000))
factor2 = precipitation[i,mx,my]
factor3 = snow[i,mx,my]
for i in nb.prange(w.shape[1]):
for j in range(w.shape[2]):
val = w[k, i, j]
temp[i, j] += factor1 * val
prec[i, j] += factor2 * val
Eprec[i, j] += factor3 * val
# Example of usage:
interpolate_and_get_results(temp, prec, Eprec, w, i, k, mx, my)
请注意,nb.njit
中的字符串被称为签名,并指定JIT的类型,以便它可以急切地编译它。
在我的6核机器上,此代码的速度是4.6倍(而如果没有get_results
和interpolator
的合并,它的速度几乎没有)。事实上,它在顺序中的速度是原来的3.8倍,因此线程并没有多大帮助,因为计算是仍然内存受限。事实上,与内存读/写相比,乘法-加法的成本可以忽略不计。