矢量化以实现性能



我想避免在下面的代码中使用 for 循环来实现性能。矢量化是否适合此类问题?

a = np.array([[0,1,2,3,4],
[5,6,7,8,9],
[0,1,2,3,4],
[5,6,7,8,9],
[0,1,2,3,4]],dtype= np.float32)
temp_a = np.copy(a)
for i in range(1,a.shape[0]-1):
for j in range(1,a.shape[1]-1):
if a[i,j] > 3:
temp_a[i+1,j] += a[i,j] / 5.
temp_a[i-1,j] += a[i,j] / 5.
temp_a[i,j+1] += a[i,j] / 5.
temp_a[i,j-1] += a[i,j] / 5.
temp_a[i,j]   -= a[i,j] * 4. / 5.
a = np.copy(temp_a)   

你基本上是在做卷积,对边界有一些特殊的处理。

请尝试以下操作:

from scipy.signal import convolve2d

# define your filter
f = np.array([[0.0, 0.2, 0.0],
[0.2,-0.8, 0.2],
[0.0, 0.2, 0.0]])
# select parts of 'a' to be used for convolution
b = (a * (a > 3))[1:-1, 1:-1]
# convolve, padding with zeros ('same' mode)
c = convolve2d(b, f, mode='same')
# add the convolved result to 'a', excluding borders
a[1:-1, 1:-1] += c
# treat the special cases of the borders
a[0, 1:-1] += .2 * b[0, :]
a[-1, 1:-1] += .2 * b[-1, :]
a[1:-1, 0] += .2 * b[:, 0]
a[1:-1, -1] += .2 * b[:, -1]

它给出以下结果,与嵌套循环相同。

[[  0.    2.2   3.4   4.6   4. ]
[  6.2   2.6   4.2   3.   10.6]
[  0.    3.4   4.8   6.2   4. ]
[  6.2   2.6   4.2   3.   10.6]
[  0.    2.2   3.4   4.6   4. ]]

我的跟踪使用 3 个过滤器,rot90np.wherenp.sumnp.multiply。我不确定哪种基准测试方式更合理。如果不考虑创建过滤器的时间,它大约快 4 倍。

# Each filter basically does what `op` tries to achieve in a loop
filter1 = np.array([[0, 1 ,0, 0, 0],
[1, -4, 1, 0, 0],
[0, 1, 0, 0, 0],
[0, 0, 0, 0, 0],
[0, 0, 0, 0, 0]]) /5.
filter2 = np.array([[0, 0 ,1, 0, 0],
[0, 1, -4, 1, 0],
[0, 0, 1, 0, 0],
[0, 0, 0, 0, 0],
[0, 0, 0, 0, 0]]) /5.
filter3 = np.array([[0, 0 ,0, 0, 0],
[0, 0, 1, 0, 0],
[0, 1, -4, 1, 0],
[0, 0, 1, 0, 0],
[0, 0, 0, 0, 0]]) /5.
# only loop over the center of the matrix, a
center = np.array([[0, 0, 0, 0, 0],
[0, 1, 1, 1, 0],
[0, 1, 1, 1, 0],
[0, 1, 1, 1, 0],
[0, 0, 0, 0, 0]])

filter1filter2可以旋转以分别代表 4 个过滤器。

filter1_90_rot = np.rot90(filter1, k=1)
filter1_180_rot = np.rot90(filter1, k=2)
filter1_270_rot = np.rot90(filter1, k=3)
filter2_90_rot = np.rot90(filter2, k=1)
filter2_180_rot = np.rot90(filter2, k=2)
filter2_270_rot = np.rot90(filter2, k=3)
# Based on different index from `a` return different filter
filter_dict = {
(1,1): filter1,
(3,1): filter1_90_rot,
(3,3): filter1_180_rot,
(1,3): filter1_270_rot,
(1,2): filter2,
(2,1): filter2_90_rot,
(3,2): filter2_180_rot,
(2,3): filter2_270_rot,
(2,2): filter3
}

主要功能

def get_new_a(a):
x, y = np.where(((a > 3) * center) > 0) # find pairs that match the condition
return a + np.sum(np.multiply(filter_dict[i, j], a[i,j])
for (i, j) in zip(x,y))

注意:似乎存在一些数字错误,因此np.equal()大多会在我的结果和 OP 之间返回False,而np.close()会返回 true。

计时结果

def op():
temp_a = np.copy(a)
for i in range(1,a.shape[0]-1):
for j in range(1,a.shape[1]-1):
if a[i,j] > 3:
temp_a[i+1,j] += a[i,j] / 5.
temp_a[i-1,j] += a[i,j] / 5.
temp_a[i,j+1] += a[i,j] / 5.
temp_a[i,j-1] += a[i,j] / 5.
temp_a[i,j]   -= a[i,j] * 4. / 5.
a2 = np.copy(temp_a)   
%timeit op()
167 µs ± 2.72 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
%timeit get_new_a(a):
37.2 µs ± 2.68 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each) 

再次注意,我们忽略了创建过滤器的时间,因为我认为这将是一次性的事情。如果您确实想包括创建过滤器的时间,它大约快两倍。您可能会认为这不公平,因为op的方法包含两个np.copy。我认为,op方法的瓶颈是for循环。

参考:

numpy.multiply两个矩阵之间进行元素乘法。
np.rot90为我们轮换。k是一个参数,您可以决定旋转多少次。np.isclose可以使用此函数来检查两个矩阵是否在您可以定义的某个误差内接近。

我想出了这个解决方案:

a = np.array([[0,0,0,0,0],
[0,6,2,8,0],
[0,1,5,3,0],
[0,6,7,8,0],
[0,0,0,0,0]],dtype= np.float32)
up= np.zeros_like(a)
down= np.zeros_like(a)
right= np.zeros_like(a)
left = np.zeros_like(a)
def new_a(a,u,r,d,l):
c = np.copy(a)
c[c <= 3] = 0
up[:-2, 1:-1] += c[1:-1,1:-1] / 5.
down[2:, 1:-1] += c[1:-1,1:-1] / 5.
left[1:-1, :-2] += c[1:-1,1:-1]/ 5.
right[1:-1, 2:] += c[1:-1,1:-1] / 5.
a[1:-1,1:-1] -= c[1:-1,1:-1] * 4. / 5.
a += up + down + left + right
return a

最新更新