使用并行处理从多维数组中积分出维



我希望找到一些聪明的方法来解决我一直在努力解决的并行处理问题。基本上,我要处理20,160个大小为(72,35,25,20)的多维数组。目前,我通过在嵌套的for循环中简单地做一个梯形积分,将尺寸为72的维度积分出来。我的最终目标是得到一个大小为(20160,35,25,20)的输出数组。

for idx,filename in enumerate(filenames):
#Read NetCDF Data File as 'raw_data'
flux=raw_data['FluxHydrogen'][:]   #This is size (72,35,25,20)
PA=raw_data['PitchAngleGrid'][:]   #This is size (72)
for i in range(35):
for j in range(25):
for k in range(20):
dir_flux=flux[:,i,j,k]
omni_flux=np.trapz(dir_flux*np.sin(PA),PA)
data[idx,i,j,k]=omni_flux   #This will have size (20160,35,25,20)

我相信在嵌套的for循环中实现并行化将是最有益的,但似乎不知道如何实现。我已经搜索了一些常见的问题,但是没有一个问题提供了足够的见解,说明如何实现共享内存,将多维数组传递给池,和/或重塑结果数组。如有任何帮助或见解,我将不胜感激。

您可以使用Numba为了大大加快代码的速度。Numba是一个JIT编译器,能够将基于numpy的代码编译为快速的本机代码(因此循环不是问题,实际上在Numba中使用循环是一个好主意)。

首先要做的是预计算一次np.sin(PA)为了避免重复计算。然后,dir_flux * np.sin(PA)可以使用for循环计算,结果可以存储在预分配的数组中。因此,不要执行数百万昂贵的小数组分配。外部循环可以使用多个线程来执行。使用prange和Numba标志parallel=True。它可以使用fastmath=True标志进一步加速,假设输入值不是特殊的(如NaN或Inf非常非常小:参见次正常数字)。

虽然理论上这应该足以获得一个快速的代码,但np.trapz的当前实现并不高效,因为它执行昂贵的分配。您可以很容易地重写该函数,以便不分配任何额外的数组。

下面是结果代码:
import numpy as np
import numba as nb
@nb.njit('(float64[::1], float64[::1])')
def trapz(y, x):
s = 0.0
for i in range(x.size-1):
dx = x[i+1] - x[i]
dy = y[i] + y[i+1]
s += dx * dy
return s * 0.5
@nb.njit('(float64[:,:,:,:], float64[:])', parallel=True)
def compute(flux, PA):
sl, si, sj, sk = flux.shape
assert sl == PA.size
data = np.empty((si, sj, sk))
flattenPA = np.ascontiguousarray(PA)
sinPA = np.sin(flattenPA)
for i in nb.prange(si):
tmp = np.empty(sl)
for j in range(sj):
for k in range(sk):
dir_flux = flux[:, i, j, k]
for l in range(sl):
tmp[l] = dir_flux[l] * sinPA[l]
omni_flux = trapz(tmp, flattenPA)
data[i, j, k] = omni_flux
return data
for idx,filename in enumerate(filenames):
# Read NetCDF Data File as 'raw_data'
flux=raw_data['FluxHydrogen'][:]   #This is size (72,35,25,20)
PA=raw_data['PitchAngleGrid'][:]   #This is size (72)
data[idx] = compute(flux, PA)

说明fluxPA必须为Numpy数组。还要注意,只要len(PA)相对较小,np.std(PA)不太大,trapz就是准确的。否则,成对求和或甚至(偏执的)Kahan求和应该有所帮助(注意Numpy使用成对求和)。在实践中,对随机正态数的结果是相同的。


进一步优化通过使flux访问更连续,可以使代码更快。一个有效的换位可以做到这一点(Numpy的换位不是有效的)。然而,在4D数组上做到这一点并不简单。另一种解决方案是在k维的整行上计算trapz操作。这使得计算非常高效,在我的机器上几乎没有内存限制。下面是代码:

@nb.njit('(float64[:,:,:,:], float64[:])', fastmath=True, parallel=True)
def compute(flux, PA):
sl, si, sj, sk = flux.shape
assert sl == PA.size
data = np.empty((si, sj, sk))
sinPA = np.sin(PA)
premultPA = PA * 0.5
for i in nb.prange(si):
for j in range(sj):
dir_flux = flux[:, i, j, :]
data[i, j, :].fill(0.0)
for l in range(sl-1):
dx = premultPA[l+1] - premultPA[l]
fact1 = dx * sinPA[l]
fact2 = dx * sinPA[l+1]
for k in range(sk):
data[i, j, k] += fact1 * dir_flux[l, k] + fact2 * dir_flux[l+1, k]
return data

请注意,预乘法使计算稍微不那么精确。

结果

下面是在我的6核机器(i5-9600KF处理器)上使用随机数(如@DominikStańczak)的结果:

Initial sequential solution:                       193.14 ms  (±  1.8 ms)
DominikStańczak sequential vectorized solution:      8.68 ms  (± 48.1 µs)
Numba parallel solution without fastmath:            0.48 ms  (±  6.7 µs)
Numba parallel solution without fastmath:            0.38 ms  (±  9.5 µs)
Best Numba solution (with fastmath):                 0.32 ms  (±  5.2 µs)
Optimal lower-bound execution:                       0.24 ms  (RAM bandwidth saturation)
因此,最快的Numba版本比(顺序)版本@DominikStańczak快27倍,比初始版本快604倍。. 这几乎是最优的。

作为第一步,让我们对代码本身进行矢量化。现在我将以每个文件为基础进行处理,以向您展示如何摆脱嵌套的for循环:

shape = (72, 35, 25, 20)
flux = np.random.normal(size=shape)
PA = np.random.normal(size=shape[0])

现在,计时你的实现,重写一点:

%%timeit
data = np.empty(shape[1:])
for i in range(shape[1]):
for j in range(shape[2]):
for k in range(shape[3]):
dir_flux=flux[:,i,j,k]
omni_flux=np.trapz(dir_flux*np.sin(PA),PA)
data[i,j,k]=omni_flux
# 211 ms ± 4.86 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

我的第一个想法是将sin拉出for循环,因为没有必要每次都重新计算它,但这让我得到了10ms的上限。但是,如果我们不使用for循环,而是通过广播使用普通numpy矢量化,将sin_PA转换为(72,1,1,1)形状的数组:

%%timeit
sin_PA = np.sin(PA).reshape(-1, 1, 1, 1)
data = np.trapz(flux * sin_PA, x=PA, axis=0)
# 9.03 ms ± 554 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

这是20倍的速度,没有什么可嘲笑的。我估计你所有的文件都需要三分钟。您还可以使用np.allclose来验证结果是否符合浮点误差。


如果你仍然需要并行化这个之后,我会使用dask.array事实上,如果你有你的netcdf4文件中的数据,我会使用xarray(这是有帮助的多维数据无论如何)来读取这些,然后运行trapz计算与后台启用Dask。我认为在这种情况下,这是实现简单多处理的最简单方法。这里有一个简短的草图:

import xarray
from Dask.distributed import Client
client = Client()
file_data = xarray.open_mfdataset(filenames, parallel=True)
# massage the data a little, probably
flux = file_data["FluxHydrogen"]
PA = file_data["PitchAngleGrid"]
integrand = flux * np.sin(PA)   # most element-wise numpy operations work on xarray ones or Dask based ones without a hitch
data = integrand.integrate(coord="PitchAngle")   # or some such name for the dimension you're integrating out

最新更新