在2D numpy矩阵上进行高效方便的滑动/滚动操作



我为任何可能需要它的人回答这个问题,但如果有更有效(但不难看(的方法可以做到这一点,请回答!

基本上,我只想对矩阵进行滑动运算。所以对于数据集

data_in = np.reshape(np.repeat(np.arange(10).T, 4), [10, 4])**2

array([[ 0,  0,  0,  0],
[ 1,  1,  1,  1],
[ 4,  4,  4,  4],
[ 9,  9,  9,  9],
[16, 16, 16, 16],
[25, 25, 25, 25],
[36, 36, 36, 36],
[49, 49, 49, 49],
[64, 64, 64, 64],
[81, 81, 81, 81]])

窗口5(居中(的标准偏差将输出

nan
nan
5.89915248150105
8.648699324175862
11.436782764396638
14.24078649513432
17.052858997833766
19.86957473123167
nan
nan

或使用np.nanstd

1.699673171197595
3.5
5.89915248150105
8.648699324175862
11.436782764396638
14.24078649513432
17.052858997833766
19.86957473123167
16.80029761641144
13.072447700751718

如果将xN样本上的方差重写为sum(x**2) - sum(x)**2 / N,则可以使用带有boxcar内核的卷积来进行计算。例如:

import numpy as np
from scipy.signal import convolve
x = np.array([
[ 0,  0,  0,  0],
[ 1,  1,  1,  1],
[ 4,  4,  4,  4],
[ 9,  9,  9,  9],
[16, 16, 16, 16],
[25, 25, 25, 25],
[36, 36, 36, 36],
[49, 49, 49, 49],
[64, 64, 64, 64],
[81, 81, 81, 81]])
N = 5
kernel = np.ones((N, 1), 1)
sigma = np.sqrt(convolve(x**2, kernel, mode='valid') - convolve(x, kernel, mode='valid')**2 / N)

在这种情况下,我们得到的sigma

array([[13.19090596, 13.19090596, 13.19090596, 13.19090596],
[19.33907961, 19.33907961, 19.33907961, 19.33907961],
[25.57342371, 25.57342371, 25.57342371, 25.57342371],
[31.84336666, 31.84336666, 31.84336666, 31.84336666],
[38.13135193, 38.13135193, 38.13135193, 38.13135193],
[44.42971978, 44.42971978, 44.42971978, 44.42971978]])

虽然我确信scipy不再提供这种功能,但boxcar卷积可以优化为在O(n(时间内运行。

EDIT 1:根据@hpaulj的建议,a发现了一种使用np.lib.stride_tricks.sliding_window_view的更快方法

def total_rolling_sliding_window_view(data_in, win, operation_function, shift_from_center = 0):
assert win%2 == 1, 'window must be odd'
mid = win//2
pad = np.zeros([win, data_in.shape[1]])*np.nan
L_pad = pad[:mid-shift_from_center]
R_pad = pad[:mid+shift_from_center]
data_in = np.vstack([L_pad, data_in, R_pad])
w = data_in.shape[1]
data_in = np.lib.stride_tricks.sliding_window_view(data_in, (win, w))
data_in = np.reshape(data_in, [-1, win*w])
data_out = operation_function(data_in, axis = 1)
return np.asarray(data_out)

def total_rolling_operation(data_in, win, operation_function, shift_from_center = 0):
assert win%2 == 1, 'window must be odd'
mid = win//2
pad = np.zeros([win, data_in.shape[1]])*np.nan
L_pad = pad[:mid-shift_from_center]
R_pad = pad[:mid+shift_from_center]
data_in = np.vstack([L_pad, data_in, R_pad])

data_out = []
for k in range(data_in.shape[0]-win+1):
x = data_in[k:(k+win)] 
data_out.append(operation_function(x))
return np.asarray(data_out)

确保它们输出相同的值

data_in = np.reshape(np.arange(10*4), (10, 4))**2
data_out_og = total_rolling_operation(data_in, 5, np.std, shift_from_center = 0)
data_out_sliding_window_view = total_rolling_sliding_window_view(data_in, 5, np.std, shift_from_center = 0)
print(np.asarray([data_out_og, data_out_sliding_window_view]).T)

输出:

[[         nan          nan]
[         nan          nan]
[113.49471353 113.49471353]
[158.48359537 158.48359537]
[203.98296498 203.98296498]
[249.71393634 249.71393634]
[295.56902747 295.56902747]
[341.49824304 341.49824304]
[         nan          nan]
[         nan          nan]]

定时

w, h = 4, 10000
data_in = np.reshape(np.arange(h*w), (h, w))**2
%timeit -n 10 data_out_og = total_rolling_operation(data_in, 5, np.std, shift_from_center = 0)
%timeit -n 10 data_out_sliding_window_view = total_rolling_sliding_window_view(data_in, 5, np.std, shift_from_center = 0)

输出:

10 loops, best of 5: 256 ms per loop
10 loops, best of 5: 1.6 ms per loop

原始帖子:

def total_rolling_operation(data_in, win, operation_function, shift_from_center = 0):
assert win%2 == 1, 'window must be odd'
mid = win//2
pad = np.zeros([win, data_in.shape[1]])*np.nan
L_pad = pad[:mid-shift_from_center]
R_pad = pad[:mid+shift_from_center]
data_in = np.vstack([L_pad, data_in, R_pad])

is_nan_inds = []
data_out = []
for k in range(data_in.shape[0]-win+1):
x = data_in[k:(k+win)] 
data_out.append(operation_function(x))
is_nan_inds.append(np.any(np.isnan(x)))
return np.asarray(data_out), np.asarray(is_nan_inds)
data_in = np.reshape(np.repeat(np.arange(10).T, 4), [10, 4])**2
data_out, is_nan_inds = total_rolling_operation(data_in, 5, np.std, shift_from_center = 0)
_ = [print(i1, i2) for i1, i2 in zip(data_out, is_nan_inds)]

输出

nan True
nan True
5.89915248150105 False
8.648699324175862 False
11.436782764396638 False
14.24078649513432 False
17.052858997833766 False
19.86957473123167 False
nan True
nan True

或者你可以定义你自己的功能

def custom_nanstd(x):
return np.sqrt(np.nanmean((x - np.nanmean(x))**2))
data_out, is_nan_inds = total_rolling_operation(data_in, 5, custom_nanstd, shift_from_center = 0)
_ = [print(i1, i2) for i1, i2 in zip(data_out, is_nan_inds)]

输出

1.699673171197595 True
3.5 True
5.89915248150105 False
8.648699324175862 False
11.436782764396638 False
14.24078649513432 False
17.052858997833766 False
19.86957473123167 False
16.80029761641144 True
13.072447700751718 True

如果你不希望窗口的要求是奇数(这确保了数据将集中在一个点上(,你可以重新配置为始终从左侧或右侧开始。

最新更新