我为任何可能需要它的人回答这个问题,但如果有更有效(但不难看(的方法可以做到这一点,请回答!
基本上,我只想对矩阵进行滑动运算。所以对于数据集
data_in = np.reshape(np.repeat(np.arange(10).T, 4), [10, 4])**2
array([[ 0, 0, 0, 0],
[ 1, 1, 1, 1],
[ 4, 4, 4, 4],
[ 9, 9, 9, 9],
[16, 16, 16, 16],
[25, 25, 25, 25],
[36, 36, 36, 36],
[49, 49, 49, 49],
[64, 64, 64, 64],
[81, 81, 81, 81]])
窗口5(居中(的标准偏差将输出
nan
nan
5.89915248150105
8.648699324175862
11.436782764396638
14.24078649513432
17.052858997833766
19.86957473123167
nan
nan
或使用np.nanstd
1.699673171197595
3.5
5.89915248150105
8.648699324175862
11.436782764396638
14.24078649513432
17.052858997833766
19.86957473123167
16.80029761641144
13.072447700751718
如果将x
的N
样本上的方差重写为sum(x**2) - sum(x)**2 / N
,则可以使用带有boxcar内核的卷积来进行计算。例如:
import numpy as np
from scipy.signal import convolve
x = np.array([
[ 0, 0, 0, 0],
[ 1, 1, 1, 1],
[ 4, 4, 4, 4],
[ 9, 9, 9, 9],
[16, 16, 16, 16],
[25, 25, 25, 25],
[36, 36, 36, 36],
[49, 49, 49, 49],
[64, 64, 64, 64],
[81, 81, 81, 81]])
N = 5
kernel = np.ones((N, 1), 1)
sigma = np.sqrt(convolve(x**2, kernel, mode='valid') - convolve(x, kernel, mode='valid')**2 / N)
在这种情况下,我们得到的sigma
array([[13.19090596, 13.19090596, 13.19090596, 13.19090596],
[19.33907961, 19.33907961, 19.33907961, 19.33907961],
[25.57342371, 25.57342371, 25.57342371, 25.57342371],
[31.84336666, 31.84336666, 31.84336666, 31.84336666],
[38.13135193, 38.13135193, 38.13135193, 38.13135193],
[44.42971978, 44.42971978, 44.42971978, 44.42971978]])
虽然我确信scipy不再提供这种功能,但boxcar卷积可以优化为在O(n(时间内运行。
EDIT 1:根据@hpaulj的建议,a发现了一种使用np.lib.stride_tricks.sliding_window_view
的更快方法
def total_rolling_sliding_window_view(data_in, win, operation_function, shift_from_center = 0):
assert win%2 == 1, 'window must be odd'
mid = win//2
pad = np.zeros([win, data_in.shape[1]])*np.nan
L_pad = pad[:mid-shift_from_center]
R_pad = pad[:mid+shift_from_center]
data_in = np.vstack([L_pad, data_in, R_pad])
w = data_in.shape[1]
data_in = np.lib.stride_tricks.sliding_window_view(data_in, (win, w))
data_in = np.reshape(data_in, [-1, win*w])
data_out = operation_function(data_in, axis = 1)
return np.asarray(data_out)
def total_rolling_operation(data_in, win, operation_function, shift_from_center = 0):
assert win%2 == 1, 'window must be odd'
mid = win//2
pad = np.zeros([win, data_in.shape[1]])*np.nan
L_pad = pad[:mid-shift_from_center]
R_pad = pad[:mid+shift_from_center]
data_in = np.vstack([L_pad, data_in, R_pad])
data_out = []
for k in range(data_in.shape[0]-win+1):
x = data_in[k:(k+win)]
data_out.append(operation_function(x))
return np.asarray(data_out)
确保它们输出相同的值
data_in = np.reshape(np.arange(10*4), (10, 4))**2
data_out_og = total_rolling_operation(data_in, 5, np.std, shift_from_center = 0)
data_out_sliding_window_view = total_rolling_sliding_window_view(data_in, 5, np.std, shift_from_center = 0)
print(np.asarray([data_out_og, data_out_sliding_window_view]).T)
输出:
[[ nan nan]
[ nan nan]
[113.49471353 113.49471353]
[158.48359537 158.48359537]
[203.98296498 203.98296498]
[249.71393634 249.71393634]
[295.56902747 295.56902747]
[341.49824304 341.49824304]
[ nan nan]
[ nan nan]]
定时
w, h = 4, 10000
data_in = np.reshape(np.arange(h*w), (h, w))**2
%timeit -n 10 data_out_og = total_rolling_operation(data_in, 5, np.std, shift_from_center = 0)
%timeit -n 10 data_out_sliding_window_view = total_rolling_sliding_window_view(data_in, 5, np.std, shift_from_center = 0)
输出:
10 loops, best of 5: 256 ms per loop
10 loops, best of 5: 1.6 ms per loop
原始帖子:
def total_rolling_operation(data_in, win, operation_function, shift_from_center = 0):
assert win%2 == 1, 'window must be odd'
mid = win//2
pad = np.zeros([win, data_in.shape[1]])*np.nan
L_pad = pad[:mid-shift_from_center]
R_pad = pad[:mid+shift_from_center]
data_in = np.vstack([L_pad, data_in, R_pad])
is_nan_inds = []
data_out = []
for k in range(data_in.shape[0]-win+1):
x = data_in[k:(k+win)]
data_out.append(operation_function(x))
is_nan_inds.append(np.any(np.isnan(x)))
return np.asarray(data_out), np.asarray(is_nan_inds)
data_in = np.reshape(np.repeat(np.arange(10).T, 4), [10, 4])**2
data_out, is_nan_inds = total_rolling_operation(data_in, 5, np.std, shift_from_center = 0)
_ = [print(i1, i2) for i1, i2 in zip(data_out, is_nan_inds)]
输出
nan True
nan True
5.89915248150105 False
8.648699324175862 False
11.436782764396638 False
14.24078649513432 False
17.052858997833766 False
19.86957473123167 False
nan True
nan True
或者你可以定义你自己的功能
def custom_nanstd(x):
return np.sqrt(np.nanmean((x - np.nanmean(x))**2))
data_out, is_nan_inds = total_rolling_operation(data_in, 5, custom_nanstd, shift_from_center = 0)
_ = [print(i1, i2) for i1, i2 in zip(data_out, is_nan_inds)]
输出
1.699673171197595 True
3.5 True
5.89915248150105 False
8.648699324175862 False
11.436782764396638 False
14.24078649513432 False
17.052858997833766 False
19.86957473123167 False
16.80029761641144 True
13.072447700751718 True
如果你不希望窗口的要求是奇数(这确保了数据将集中在一个点上(,你可以重新配置为始终从左侧或右侧开始。