作为用于数据分析和数值计算的Python用户,而不是真正的"编码员",我一直缺少一种在多个内核上分发令人尴尬的并行循环计算的真正低开销的方法。据我所知,Numba曾经有prange结构,但由于"不稳定和性能问题"而被放弃。
使用新开源的@guvectorize装饰器,我找到了一种方法来使用它来模拟后期 prange 的功能,几乎没有开销。
我很高兴现在手头有这个工具,感谢Continuum Analytics的人,并且在网上没有找到任何明确提到这种@guvectorize使用的内容。虽然对于之前使用过 NumbaPro 的人来说这可能是微不足道的,但我为所有那些非编码人员发布这个(请参阅我对这个"问题"的回答)。
考虑下面的示例,其中以四种不同的方式执行一个两级嵌套的 for 循环,其中有一个核心执行一些涉及两个输入数组和一个循环索引函数的数值计算。每个变体都与Ipython的%timeit
魔法计时:
- naïve for loop,使用 numba.jit 编译
- 使用numba.guvectorize的类似结构,在单个线程中执行(
target = "cpu"
) - 使用 numba.guvectorize 的类似 forall 的构造,在与 CPU"内核"(在我的例子中为超线程)一样多的线程中执行(
target = "parallel"
) - 与 3 相同,但调用"guvectorized"forall 与"并行"循环索引序列随机排列
最后一个是因为(在此特定示例中)内部循环的范围取决于外部循环索引的值。我不知道 numpy 内部究竟是如何组织 gufunc 调用的,但似乎"并行"循环索引的随机化实现了稍微更好的负载平衡。
在我的(慢速)机器(第一代酷睿 i5、2 核、4 超线程)上,我得到了计时:
1 loop, best of 3: 8.19 s per loop
1 loop, best of 3: 8.27 s per loop
1 loop, best of 3: 4.6 s per loop
1 loop, best of 3: 3.46 s per loop
注意:如果这个配方很容易适用于 target="gpu"(它应该可以,但我现在无法访问合适的显卡),我会很感兴趣,以及加速是什么。请发帖!
下面是这个例子:
import numpy as np
from numba import jit, guvectorize, float64, int64
@jit
def naive_for_loop(some_input_array, another_input_array, result):
for i in range(result.shape[0]):
for k in range(some_input_array.shape[0] - i):
result[i] += some_input_array[k+i] * another_input_array[k] * np.sin(0.001 * (k+i))
@guvectorize([(float64[:],float64[:],int64[:],float64[:])],'(n),(n),()->()', nopython=True, target='parallel')
def forall_loop_body_parallel(some_input_array, another_input_array, loop_index, result):
i = loop_index[0] # just a shorthand
# do some nontrivial calculation involving elements from the input arrays and the loop index
for k in range(some_input_array.shape[0] - i):
result[0] += some_input_array[k+i] * another_input_array[k] * np.sin(0.001 * (k+i))
@guvectorize([(float64[:],float64[:],int64[:],float64[:])],'(n),(n),()->()', nopython=True, target='cpu')
def forall_loop_body_cpu(some_input_array, another_input_array, loop_index, result):
i = loop_index[0] # just a shorthand
# do some nontrivial calculation involving elements from the input arrays and the loop index
for k in range(some_input_array.shape[0] - i):
result[0] += some_input_array[k+i] * another_input_array[k] * np.sin(0.001 * (k+i))
arg_size = 20000
input_array_1 = np.random.rand(arg_size)
input_array_2 = np.random.rand(arg_size)
result_array = np.zeros_like(input_array_1)
# do single-threaded naive nested for loop
# reset result_array inside %timeit call
%timeit -r 3 result_array[:] = 0.0; naive_for_loop(input_array_1, input_array_2, result_array)
result_1 = result_array.copy()
# do single-threaded forall loop (loop indices in-order)
# reset result_array inside %timeit call
loop_indices = range(arg_size)
%timeit -r 3 result_array[:] = 0.0; forall_loop_body_cpu(input_array_1, input_array_2, loop_indices, result_array)
result_2 = result_array.copy()
# do multi-threaded forall loop (loop indices in-order)
# reset result_array inside %timeit call
loop_indices = range(arg_size)
%timeit -r 3 result_array[:] = 0.0; forall_loop_body_parallel(input_array_1, input_array_2, loop_indices, result_array)
result_3 = result_array.copy()
# do forall loop (loop indices scrambled for better load balancing)
# reset result_array inside %timeit call
loop_indices_scrambled = np.random.permutation(range(arg_size))
loop_indices_unscrambled = np.argsort(loop_indices_scrambled)
%timeit -r 3 result_array[:] = 0.0; forall_loop_body_parallel(input_array_1, input_array_2, loop_indices_scrambled, result_array)
result_4 = result_array[loop_indices_unscrambled].copy()
# check validity
print(np.all(result_1 == result_2))
print(np.all(result_1 == result_3))
print(np.all(result_1 == result_4))