使用来自另一个数组的值有效地屏蔽和减少大型多维数组



我有一个具有两个三维(time, y, x)变量ab:的xarray.DataArray

import numpy as np
import xarray as xr
# Random data
a = np.random.rand(100, 3000, 3000).astype(np.float32)
b = np.random.rand(100, 3000, 3000).astype(np.float32)
# Create xarray.Dataset with two vars
ds = xr.Dataset(
data_vars={
"a": xr.DataArray(a, dims=("time", "y", "x")),
"b": xr.DataArray(b, dims=("time", "y", "x")),
}
)

当我的变量b在最小和最大阈值之间时,我需要计算atime维度上的中值。这些阈值针对每个x, y像素而变化(即,它们可以表示为二维(x, y)阵列):

random_vals = np.random.rand(1, 3000, 3000) / 10.0
min_threshold = 0.5 - random_vals
max_threshold = 0.5 + random_vals

目前,我通过识别b中处于阈值之间的像素来实现这一点,使用这个布尔数组使用xarray.where屏蔽a,然后最终计算a沿time维度的中值:

b_within_threshold = (ds.b > min_threshold) & (ds.b < max_threshold)
ds.a.where(b_within_threshold).median(dim='time')

这是可行的,但挑战是速度非常慢:本例为7.97 s ± 0 ns per loop(我的实际阵列可能要大得多:例如shape=(500, 5000, 5000))。在我的分析中,我需要对不同的最小/最大阈值集进行数百次计算,例如:

for i in np.linspace(0, 1, 100):

# Create thresholds
random_vals = np.random.rand(1, 3000, 3000) / 10.0
min_threshold = i - random_vals
max_threshold = i + random_vals

# Apply mask and compute median
b_within_threshold = (ds.b > min_threshold) & (ds.b < max_threshold)
ds.a.where(b_within_threshold).median(dim='time')

有没有一种更有效/更快的方法可以将这种计算应用于我的数据?我对xarraynumpypandas解决方案都很满意——考虑到我需要处理的数据量,即使在尝试使用multiprocessingDask并行我的代码时,我当前方法的速度也是不切实际的。

中值是一个相当昂贵的操作,因为它涉及(部分)对列表进行排序并从中选择中间值。因此,按照时间维度应用此操作,可以将数百万(短)列表排序数百次这只需要时间

您的解决方案在python级别上已经接近最佳,因此您仅有的两个选择是要么更改需求,要么优化恒定开销并并行化。有四件事你可以改进:

  • 如果可以,请选择mean而不是median,因为它的计算成本较低
  • 使用dask来并行化计算。你在评论中提到,你已经很熟悉了,所以我不会在这里做这件事
  • 使用numba或cython编译自己的内核,以避免昂贵的中间副本
  • 确保您的数据与您正在计算的维度对齐/连续。在这种情况下是time,所以要么切换到使用fortran有序数组,要么将时间作为数组的最后一个维度

具体来说,以下是您的解决方案与使用自己的(fortran对齐的)numba内核的时间安排:

Your Approach: 79.0010 s
Numba JIT: 7.7854 s

所以大约快10倍。请记住,这些都是单核定时,并行处理将进一步加快速度。以下是上述时间的代码:

import numpy as np
import xarray as xr
from timeit import timeit
import numba as nb

def time_solution(solution, number=1):
return timeit(
f"{solution.__name__}(data, threshold_value, low, high)",
setup=f"from __main__ import data, threshold_value, low, high, {solution.__name__}",
number=number,
)

def your_solution(data, threshold_value, low, high):
ds = xr.Dataset(
data_vars={
"a": xr.DataArray(data, dims=("time", "y", "x")),
"b": xr.DataArray(threshold_value, dims=("time", "y", "x")),
}
)
result = ds.a.where((ds.b > low) & (ds.b < high)).median(dim="time")
return result

@nb.jit(
"float32[:, :](float32[::1, :, :], float32[::1, :, :], float32[::1, :, :], float32[::1, :, :])",
nopython=True,
nogil=True,
)
def numba_magic(data, threshold_value, low, high):
output = np.empty(data.shape[1:], dtype=np.float32)
for height in range(data.shape[1]):
for width in range(data.shape[2]):
threshold = threshold_value[:, height, width]
mask = (low[:, height, width] < threshold) & (
threshold < high[:, height, width]
)
buffer = np.where(mask, data[:, height, width], np.nan)
output[height, width] = np.nanmedian(buffer)
return output

# Time solutions
# ==============
shape = (100, 3000, 3000)
rng = np.random.default_rng()
data = rng.random(shape).astype(np.float32, order="F")
threshold_value = rng.random(shape).astype(np.float32, order="F")
random_vals = (rng.random(shape) / 10).astype(np.float32, order="F")
low = 0.5 - random_vals
high = 0.5 + random_vals

# assert equality of solutions
expected = np.asarray(your_solution(data, threshold_value, low, high))
actual_numba = numba_magic(data, threshold_value, low, high)
assert np.allclose(expected, actual_numba, equal_nan=True)
# compare timings of solutions
repeats = 10
print("""
Timings
-------""")
print(f"Your Approach: {time_solution(your_solution, repeats)/repeats:.4f} s")
print(f"Numba JIT: {time_solution(numba_magic, repeats)/repeats:.4f} s")
  • 一个改进可以是在时间维度上对数组进行排序。这需要大量的前期计算成本,但一开始只有一次
  • 之后,您可以继续以相同的方式计算阈值并屏蔽a数组
  • 然后,不是通过调用中值函数,而是通过直接访问a数组中的中间元素来计算中值(如果数组长度为偶数,则分别为两个中间元素的平均值)
for i in np.linspace(0, 1, 100):
# Create thresholds
random_vals = np.random.rand(1, 3000, 3000) / 10.0
min_threshold = i - random_vals
max_threshold = i + random_vals
# Apply mask and compute median
b_within_threshold = (ds.b > min_threshold) & (ds.b < max_threshold)
a_masked = ds.a.where(b_within_threshold)
# Faster way to calculate median on a sorted array
len_a_masked = len(a_masked)
if len_a_masked == 0:
median = None
elif len_a_masked % 2 == 0:
median = 0.5 * (a_masked[(len_a_masked - 1) // 2] + a_masked[len_a_masked // 2])
else:
median = a_masked[(len_a_masked - 1) // 2]

根据计算的中位数数量,这应该是一个显著的改进,因为只对数组进行一次排序会带来额外的成本,但每次阈值迭代的中位数计算速度会更快,这是一个改进。

最新更新