如何在python中优化具有numpy、C、numba(或CUDA)的3 for循环的图像处理算法



我写了下面的任务,在图像中只选择有颜色的像素(非黑色和白色(,并根据给定的比例为它们赋值。当前的代码有3个嵌套的for循环,在我的MacBook Pro 2015 2.8Ghz四核Intel Core i7上运行需要61秒。我的问题是如何优化代码以使其更快地完成:

  • 1.1我应该在numpy中向量化吗
  • 1.2我应该用C写一个外部函数吗
  • 1.3我要用麻木药吗
  • 1.4我目前没有GPU,但我想买一个。使用带有CUDA代码的GPU会更快地运行这样的问题吗?如果是,相对于其他解决方案(1.1、1.2或1.3(,近似的加速因子是多少

非常感谢!

import numpy as np
import pdb
import time
list_scale_values = [[255, 255, 1], [255, 252, 0], [254, 251, 0], [253, 248, 0], [253, 245, 0], [253, 242, 0], [252, 241, 1], [251, 238, 1], [252, 234, 0], [251, 233, 1], [249, 231, 1], [248, 230, 0], [248, 227, 0], [248, 222, 1], [246, 217, 3], [243, 212, 0], [238, 206, 1], [236, 200, 0], [233, 194, 1], [230, 189, 1], [228, 184, 1], [226, 176, 1], [223, 170, 2], [219, 166, 2], [221, 159, 0], [218, 153, 0], [215, 147, 0], [213, 142, 0], [211, 135, 0], [208, 129, 0], [207, 123, 1], [203, 118, 1], [202, 112, 2], [197, 106, 1], [196, 100, 0], [193, 94, 0], [191, 87, 0], [188, 81, 0], [186, 76, 1], [183, 70, 0], [180, 65, 0], [178, 60, 0], [177, 53, 0], [173, 47, 0], [170, 41, 0], [168, 35, 0], [165, 30, 0], [163, 25, 0], [160, 17, 0], [158, 12, 0], [157, 10, 0], [153, 9, 0], [151, 8, 0], [148, 10, 0], [141, 8, 0], [140, 8, 3], [137, 8, 2], [132, 7, 1], [129, 7, 2], [126, 7, 3], [124, 6, 4], [121, 5, 5], [118, 6, 5], [115, 5, 4], [113, 5, 5], [109, 3, 3], [106, 3, 4], [102, 4, 5], [99, 3, 5], [94, 2, 3], [91, 2, 4], [88, 2, 3], [82, 3, 6], [77, 3, 4], [73, 3, 5], [69, 3, 5], [67, 3, 4], [63, 1, 4], [57, 1, 4], [56, 1, 4], [52, 0, 2], [47, 2, 0], [46, 0, 0], [34, 0, 0], [3, 0, 2], [1, 0, 12], [1, 1, 71], [4, 0, 76], [1, 2, 82], [2, 3, 86], [2, 3, 93], [3, 4, 97], [3, 6, 103], [2, 6, 106], [3, 6, 113], [3, 7, 115], [1, 8, 122], [2, 9, 125], [4, 10, 132], [3, 11, 136], [3, 12, 141], [3, 13, 145], [4, 13, 152], [4, 12, 155], [4, 14, 163], [4, 15, 167], [3, 16, 172], [4, 16, 176], [6, 21, 182], [6, 26, 185], [5, 30, 192], [5, 34, 196], [6, 38, 199], [8, 41, 204], [8, 46, 209], [7, 50, 214], [7, 54, 218], [7, 59, 221], [8, 63, 224], [12, 67, 228], [9, 72, 225], [13, 77, 227], [15, 81, 229], [17, 85, 228], [19, 90, 230], [22, 96, 231], [23, 100, 232], [25, 102, 234], [29, 107, 233], [31, 111, 234], [31, 116, 233], [35, 120, 236], [38, 124, 237], [40, 128, 238], [41, 132, 237], [42, 138, 240], [44, 142, 241], [46, 146, 242], [48, 151, 243], [50, 156, 244], [50, 159, 244], [53, 164, 246], [56, 169, 247], [56, 174, 246], [60, 176, 249], [62, 180, 250], [63, 187, 251], [66, 190, 252], [68, 194, 253], [71, 197, 255], [73, 203, 255], [75, 207, 255], [78, 211, 255], [82, 213, 255], [87, 216, 255], [90, 218, 253], [94, 221, 254], [100, 223, 255], [105, 226, 255], [108, 230, 254], [112, 230, 255], [118, 234, 255], [121, 236, 255], [126, 239, 255], [131, 241, 255], [136, 242, 255], [142, 246, 255], [144, 248, 255]]
aliasing_velocity_cm_s = 46
scale_values_velocity = np.linspace(aliasing_velocity_cm_s, -aliasing_velocity_cm_s, num=len(list_scale_values))
list_scale_values_velocity = np.array(scale_values_velocity[:]).tolist()
# ensure black is zero
list_scale_values_velocity[int(len(list_scale_values_velocity)/2)] = 0
cropped_image = np.random.randint(256, size=(300, 300, 3)) # generate an image with 300x300 pixels of random value with 3 channels of 256 bits
velocity_image = np.zeros((cropped_image.shape[0], cropped_image.shape[1], 1))
start_time = time.time()
for i_frame in range(cropped_image.shape[0]):
for j_frame in range(cropped_image.shape[1]):
isColor_score = abs(int(cropped_image[i_frame, j_frame, 0]) - int(cropped_image[i_frame, j_frame, 1])) + abs(
int(cropped_image[i_frame, j_frame, 0]) - int(cropped_image[i_frame, j_frame, 2]))
idx_list = 0
score = 1000
if isColor_score < 20:
velocity_image[i_frame, j_frame] = 0
else:
for z in range(len(list_scale_values)): 
score_pixel = abs(cropped_image[i_frame, j_frame, 0] - list_scale_values[z][0]) + abs(cropped_image[i_frame, j_frame, 1] - list_scale_values[z][1]) + abs(cropped_image[i_frame, j_frame, 2] - list_scale_values[z][2]) 
if score_pixel < score:
score = score_pixel
dx_list = z
velocity_image[i_frame, j_frame] = list_scale_values_velocity[idx_list]  # the velocity is still in cm/s

end = time.time()
elapsed = np.round(end - start_time, 2)
print('Operation finished in {} [s]!'.format(elapsed))

在需要使用numba或GPU卸载之前,可以对该代码进行大量优化。For循环可能是python中计算速度最慢的方法之一,因为开销很低。我的建议是:

1.1使用列表综合而不是for循环。在python中,它们可以快几个量级。

1.2另一种选择是不使用python,而是使用numpy来计算您的操作。例如:

color_scores = np.abs(cropped_image[:, :, 0] - cropped_image[:, :, 1]) + np.abs(cropped_image[:, :, 0] - cropped_image[:, :, 2])

在一个操作中执行代码在循环中所做的数千次操作(只需一小部分时间(。仅此一项操作,我就测得了100倍的加速。

当谈到python的性能时,只能依赖numpy函数。这些都是用C编写的,而且比普通python快得多。

好的,所以首先我将函数中嵌套的3个for循环转换为script_plain_pytho.py:

import numpy as np
import pdb
def extract_velocity(cropped_image, velocity_image, arr_scale_values, arr_scale_values_velocity, score):
#def extract_velocity(cropped_image, velocity_image, list_scale_values, score):
height = cropped_image.shape[0]
width = cropped_image.shape[1]
len_arr_scale_values = len(arr_scale_values)
for i_frame in range(height):
for j_frame in range(width):
isColor_score = np.abs(int(cropped_image[i_frame, j_frame, 0]) - int(cropped_image[i_frame, j_frame, 1])) + np.abs(
int(cropped_image[i_frame, j_frame, 0]) - int(cropped_image[i_frame, j_frame, 2]))
idx_list = 0
if isColor_score < 20:
velocity_image[i_frame, j_frame] = 0
else:
for z in range(len_arr_scale_values): 
#color_scores = np.abs(cropped_image[:, :, 0] - cropped_image[:, :, 1]) + np.abs(cropped_image[:, :, 0] - cropped_image[:, :, 2])
score_pixel = np.abs(cropped_image[i_frame, j_frame, 0] - arr_scale_values[z][0]) + np.abs(cropped_image[i_frame, j_frame, 1] - arr_scale_values[z][1]) + np.abs(cropped_image[i_frame, j_frame, 2] - arr_scale_values[z][2]) 
if score_pixel < score:
score = score_pixel
dx_list = z
velocity_image[i_frame, j_frame] = arr_scale_values_velocity[idx_list]  # the velocity is still in cm/s
return velocity_image

然后,我将该函数转换为cython函数(保存在fastloop.pyx中(,并从这个优秀的教程中学习:https://nealhughes.net/cython1/

import numpy as np
import pdb
def extract_velocity(double[:,:,:] cropped_image, double[:,:,:] velocity_image, double[:,:] arr_scale_values, double[:] arr_scale_values_velocity, double score):
#def extract_velocity(cropped_image, velocity_image, list_scale_values, score):
#pdb.set_trace()
cdef int height = cropped_image.shape[0]
cdef int width = cropped_image.shape[1]
cdef int len_arr_scale_values = len(arr_scale_values)
cdef double cropped_image_ij_0
cdef double cropped_image_ij_1
cdef double cropped_image_ij_2
cdef double diff_01
cdef double diff_02
cdef double isColor_score
cdef double arr_scale_values_z_0
cdef double arr_scale_values_z_1
cdef double arr_scale_values_z_2
cdef double diff_03
cdef double diff_04
cdef double diff_05

for i_frame in range(height):
for j_frame in range(width):
cropped_image_ij_0 = cropped_image[i_frame, j_frame, 0]
cropped_image_ij_1 = cropped_image[i_frame, j_frame, 1]
cropped_image_ij_2 = cropped_image[i_frame, j_frame, 2]
diff_01 = cropped_image_ij_0 - cropped_image_ij_1
diff_02 = cropped_image_ij_0 - cropped_image_ij_2
if diff_01 < 0:
diff_01 = - diff_01
if diff_02 < 0:
diff_02 = - diff_02
isColor_score = diff_01 + diff_02
#isColor_score = np.abs(int(cropped_image[i_frame, j_frame, 0]) - int(cropped_image[i_frame, j_frame, 1])) + np.abs( int(cropped_image[i_frame, j_frame, 0]) - int(cropped_image[i_frame, j_frame, 2]))
idx_list = 0
if isColor_score < 20:
velocity_image[i_frame, j_frame] = 0
else:
for z in range(len_arr_scale_values): 
#color_scores = np.abs(cropped_image[:, :, 0] - cropped_image[:, :, 1]) + np.abs(cropped_image[:, :, 0] - cropped_image[:, :, 2])
#score_pixel = np.abs(cropped_image[i_frame, j_frame, 0] - arr_scale_values[z][0]) + np.abs(cropped_image[i_frame, j_frame, 1] - arr_scale_values[z][1]) + np.abs(cropped_image[i_frame, j_frame, 2] - arr_scale_values[z][2]) 
arr_scale_values_z_0 = arr_scale_values[z][0]
arr_scale_values_z_1 = arr_scale_values[z][1]
arr_scale_values_z_2 = arr_scale_values[z][2]
diff_03 = cropped_image_ij_0 - arr_scale_values_z_0
diff_04 = cropped_image_ij_1 - arr_scale_values_z_1
diff_05 = cropped_image_ij_2 - arr_scale_values_z_2
if diff_03 < 0:
diff_03 = - diff_03
if diff_04 < 0:
diff_04 = - diff_04
if diff_05 < 0:
diff_05 = - diff_05
score_pixel = diff_03 + diff_04 + diff_05 
if score_pixel < score:
score = score_pixel
dx_list = z
velocity_image[i_frame, j_frame] = arr_scale_values_velocity[idx_list]  # the velocity is still in cm/s
return velocity_image

为此,您必须创建一个setup.py文件:

from distutils.core import setup
from distutils.extension import Extension
from Cython.Distutils import build_ext
ext_modules=[ Extension("fastloop",
["fastloop.pyx"],
libraries=["m"],
extra_compile_args = ["-ffast-math"])]
setup(
name = "fastloop",
cmdclass = {"build_ext": build_ext},
ext_modules = ext_modules)

然后从命令行运行以下命令:

python setup.py build_ext --inplace

最后,我在主脚本中导入了函数

velocity_image = np.array(fastloop.extract_velocity(cropped_image, velocity_image, arr_scale_values, arr_scale_values_velocity, score))

我可以获得超过500倍的速度!!

这个嵌套的for循环算法已经过时。你可以使用fft,numpy已经为你实现了这个方法,甚至可以使用fftfast。

最新更新