使用numpy构造一个数组,其中从另一个2D数组中提取的行为2x2块



假设我有以下2D数组:

x = np.array([[10,20,30,40], [50,60,70,80],[90,100,110,120]])  
print(x)
array([[ 10,  20,  30,  40],
[ 50,  60,  70,  80],
[ 90, 100, 110, 120]])

我想构造一个新的数组y,其中每行都有来自x的2x2块的值,按顺时针顺序排列:

print(y)
array([[ 10,  20,  60,  50],
[ 20,  30,  70,  60],
[ 30,  40,  80,  70],
[ 50,  60,  100, 90],
[ 60,  70,  110, 100],
[ 70,  80,  120, 110]])

我可以使用Python实现如下循环:

n_rows, n_cols = x.shape
y = []
for i in range(n_rows-1): 
for j in range(n_cols-1): 
row = [x[i,j],x[i,j+1],x[i+1, j+1],x[i+1,j]] 
y.append(row) 
y = np.array(y)

我想知道是否有一种更快的方法可以利用Numpy函数并避免使用Python循环。

首先,在x中创建一个sliding_window_view,其中包含您想要看到的2x2个框:

b = np.lib.stride_tricks.sliding_window_view(x, (2, 2))

每个最里面的2x2数组都包含您想要的分解版本,但数组的第二部分颠倒了。到目前为止,我们没有复制任何数据。现在,通过移动最后一个维度来制作一个副本。由于b是高度不连续的,因此整形将始终在此处进行复制:

c = b.reshape(*b.shape[:2], 4)

交换最后两列:

c[..., 2:] = c[..., -1:1:-1]

现在浏览领先的维度:

y = c.reshape(-1, c.shape[-1])

如果你有一个版本为1.20的numpy,你可以用代替b的定义

b = np.lib.stride_tricks.as_strided(x, shape=(x.shape[0] - 1, x.shape[1] - 1, 2, 2), strides=x.strides * 2)

您可以缓存代码,因为循环主要是一次又一次地迭代相同的矩阵(如果您想用循环保持相同的代码(。我已经对缓存前后的代码进行了速度比较。

# Before caching
def loop_before_cache():
n_rows, n_cols = x.shape
y = []
for i in range(n_rows-1): 
for j in range(n_cols-1): 
row = [x[i,j],x[i,j+1],x[i+1, j+1],x[i+1,j]] 
y.append(row) 
return np.array(y)

%timeit loop_before_cache()
11.6 µs ± 318 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

现在有了缓存

# After caching
from functools import lru_cache
@lru_cache()
def loop_after_cache():
n_rows, n_cols = x.shape
y = []
for i in range(n_rows-1): 
for j in range(n_cols-1): 
row = [x[i,j],x[i,j+1],x[i+1, j+1],x[i+1,j]] 
y.append(row) 
return np.array(y)
%timeit loop_after_cache()
83.6 ns ± 2.42 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each)

额外

我使用range添加了带有(10005000(数组的模拟数据,以显示缓存的效率。

x = np.array([i for i in range(1,5000001)])
x = np.reshape(x, (1000,5000))
# Before caching
def loop_before_cache():
n_rows, n_cols = x.shape
y = []
for i in range(n_rows-1): 
for j in range(n_cols-1): 
row = [x[i,j],x[i,j+1],x[i+1, j+1],x[i+1,j]] 
y.append(row) 
return np.array(y)
%timeit loop_before_cache()
8.58 s ± 113 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# After caching
@lru_cache(maxsize = 256)
def loop_after_cache():
n_rows, n_cols = x.shape
y = []
for i in range(n_rows-1): 
for j in range(n_cols-1): 
row = [x[i,j],x[i,j+1],x[i+1, j+1],x[i+1,j]] 
y.append(row) 
return np.array(y)
%timeit loop_after_cache()
82.2 ns ± 5.58 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each)

最新更新