我正在尝试通过Cythonization来加速矢量化numpy操作(或至少看看我是否可以)。该代码在给定两个距离矩阵(target_distances和map_distances从某个扁平坐标向量计算)和一些有关距离类型的信息(范围可以从 0 到 3,但对于我的第一次尝试,我认为在这种情况下它都是 0)。数字版本是 pycalculus.py:
import scipy
import numpy as np
from scipy.spatial import distance
def decay(x, s=10):
return scipy.special.expit(s*x)
def stress(z, target_distances, dim, distance_types, step,
nrows, ncols):
row_coords = np.reshape(z[:dim*nrows],(nrows,dim))
col_coords = np.reshape(z[dim*nrows:dim*(nrows+ncols)],(ncols,dim))
map_distances = distance.cdist(row_coords, col_coords).copy()
error = target_distances - map_distances
I0 = (distance_types==0) | (distance_types==3)
I2 = distance_types == 2
return np.sum(error[I0]**2) + np.sum((error[I2] + step)**2*decay(error[I2] + step))
在cythonized版本中,我的pyx文件如下(calculus.pyx)
import cython
cimport cython
from libc.stdlib cimport malloc, free
cdef extern from "ctools.h":
double stress (double*, double**, int**, double, int, int, int)
@cython.boundscheck(False)
@cython.wraparound(False)
@cython.nonecheck(False)
def stress_cython(double[:] z, double[:,:] target_distances, int [:,:]
distance_types, double step, int dim, int nrows, int ncols):
cdef int i
cdef int j
cdef int N = (nrows+ncols)*dim
z_C = <double*>malloc(sizeof(double)*N)
target_distances_C = <double **>malloc(sizeof(double*)*nrows)
distance_types_C = <int **>malloc(sizeof(int*)*nrows)
for i in range(N):
z_C[i] = z[i]
for i in range(nrows):
target_distances_C[i] = <double *>malloc(sizeof(double)*ncols)
distance_types_C[i] = <int *>malloc(sizeof(int)*ncols)
for j in range(ncols):
target_distances_C[i][j] = target_distances[i,j]
distance_types_C[i][j] = distance_types[i,j]
stress_val = stress(z_C, target_distances_C,
distance_types_C, step, nrows, ncols, dim)
for i in range(nrows):
free(target_distances_C[i])
free(distance_types_C[i])
free(target_distances_C)
free(distance_types_C)
return stress_val
而外部的ctools.c是
#include<stdio.h>
#include<math.h>
double decay(double x){
return 1/(1+exp(-10*x));
}
double** dist_pairs(double* z, int nrows, int ncols, int dims)
{
int i,j,d;
double coord1, coord2;
double** dist;
dist = (double **)malloc(sizeof(double*)*nrows);
for (i=0; i<nrows; i++){
dist[i] = (double *)malloc(sizeof(double)*ncols);
}
for (i=0; i<nrows; i++){
for (j=0; j<ncols; j++){
dist[i][j] = 0;
for (d=0; d<dims; d++){
coord1 = z[d*(nrows+ncols) + i];
coord2 = z[d*(nrows+ncols) + nrows + j];
dist[i][j] += pow(coord1-coord2,2.0) ;
}
dist[i][j] = sqrt(dist[i][j]);
}
}
return dist;
}
double stress(double* z, double **target_distances, int** distance_types,
double step, int nrows, int ncols, int dim){
int i,j;
double stress = 0.0;
double err = 0.0;
double **map_distances = dist_pairs(z, nrows, ncols, dim);
for (i=0; i<nrows; i++){
for (j=0; j<ncols; j++){
if (distance_types[i][j]==0 || distance_types[i][j]==3){
stress += pow(target_distances[i][j] - map_distances[i][j],2.0);
}
else if (distance_types[i][j]==2){
err = target_distances[i][j] - map_distances[i][j] + step;
stress += pow(step,2)*decay(step);
}
}
}
for (i=0; i<nrows; i++){
free(map_distances[i]);
}
free(map_distances);
return stress;
}
我做以下测试(test.py)
import numpy as np
import time
from calculus import stress_cython
from pycalculus import stress
nrows = 3000
ncols = 2000
dim = 2
N = 100
is_discrete = 1.0
dt0 = 0
dt1 = 0
difs = []
for i in range(N):
coordinates = np.random.rand(dim, nrows+ncols)
coordinates_flat = coordinates.flatten()
target_distances = np.random.rand(nrows, ncols)
distance_types = np.zeros((nrows,ncols), dtype='i')
t0 = time.time()
stress1 = stress_cython(coordinates_flat, target_distances, distance_types, is_discrete, dim, nrows,
ncols)
t1 = time.time()
stress2 = stress(coordinates.T.flatten(), target_distances, dim, distance_types, is_discrete,
nrows,ncols)
t2 = time.time()
dt0 += t1-t0
dt1 += t2-t1
difs.append(stress1-stress2)
print(f'cython:{dt0:.2f} python:{dt1:.2f}')
在我的机器中,时间非常相似(~4.47 vs ~4.62)。我不太明白为什么,在后台是否有一些并行计算进行numpy和scipy?
当我检查我的核心使用情况时,似乎并非如此。我尝试通过以下方式转换并行性
export MKL_NUM_THREADS=1
export NUMEXPR_NUM_THREADS=1
export OMP_NUM_THREADS=1
在运行 test.py 之前在外壳中,它没有区别。矢量化操作期间的并行性是我在这里缺少的,还是我只是想优化已经优化的例程,以便在这些库中进行最佳扩展?
ps1:我用arg"-ffast-math"编译cython代码
ps2:最后,我将让所有内核使用不同的初始启动条件,因此如果 numpy 自动并行化矢量化操作,它将对我无济于事。这就是我试图了解正在发生的事情的原因之一。
优化和指令集
首先,Cython 通常使用通常次优的-O2
编译(除了最新版本的 GCC 外,未启用矢量化)。-O3
可以提供更好的性能。此外,SSE 指令集默认在 x86-64 处理器上使用,而现在几乎所有指令集都支持 AVX(每个周期通常能够计算 2 倍以上的项目)。像-mavx
甚至-mavx2
/-mfma
这样的标志可以帮助提高支持时的性能(否则目标程序可能会崩溃)。可以使用-march=native
让编译器生成针对处理器优化的非可移植程序。Numpy 可以在运行时检测到这一点,并在需要时使用 AVX 指令(尽管并非所有操作都是如此)。
代码问题
目标编译器不能保证对pow(coord1-coord2, 2.0)
等操作进行优化。当然最好使用pow(coord1-coord2, 2)
,甚至(coord1-coord2) * (coord1-coord2)
.
double**
数据结构对于存储矩阵效率不高。最好分配一个大double*
扁平化大小width * height * sizeof(double)
数组。事实上,后者的好处是内存是连续的,读取/写入连续的数据通常更快。此外,dist[i][j]
可能会在内存中产生低效的间接寻址,而dist[i*ncols+j]
可能会导致指令更少/更快。事实上,它在并行代码中更为关键,因为malloc
往往不会使用多个内核进行扩展,因此您应该真正避免多次并行调用它。
如果dims
非常小(如 <8),则for (d=0; d<dims; d++)
效率不高。展开特定值的循环可以显著提高性能。或者,可以将此循环与基于 j 的循环交换,以便生成更快的代码(可能是矢量化)。把它放在第一位可能会更快。
是否有一些并行计算在后台为 Numpy 和 Scipy 进行?
目前,Numpy 不应该并行化任何接受基于 BLAS 的操作(例如矩阵乘法)。至于 Scipy,它是一回事,除了某些方法可以使用多个工作线程(非常罕见,可调并在文档中指定)。
您的代码可能受内存限制(更具体地说,由于缓存访问效率低下),因此访问内存中数据的方式非常重要。Numpy 可以比你的代码进行更有效的访问。
请注意,如果代码受内存带宽的约束,则它将无法很好地扩展(因为在内存已经饱和时,使用更多内核不会使内存访问更快)。
进一步优化(更新)
Cython 代码无缘无故地花费了大量时间复制数组,因为它们在这里是连续的。你可以把它传递给 C 函数,因为数组是未修改的。复制数据通常很慢,尤其是大型数组。
事实证明,即使提供了-O3
,编译器(GCC)在实践中也无法对dist_pairs
进行矢量化(而Numpy确实对代码进行了矢量化)。我试图修改代码以使矢量化变得微不足道,但即使在这种情况下,生成的代码效率也很低。解决方案是手动编写矢量化代码,即使它使代码不再可移植(仅在支持 AVX 的 x86-64 处理器上运行,即几乎所有不到 10 年的台式 PC)。下面是一个(未选中的)代码示例:
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <x86intrin.h>
static inline double decay(double x){
return 1/(1+exp(-10*x));
}
double* dist_pairs(double* restrict z, int nrows, int ncols, int dims){
double* dist = malloc(sizeof(double) * nrows * ncols);
if (dims == 2){
// Fast 2D case
for (long i=0; i<nrows; i++){
int j;
int z_offset_d0 = nrows;
int z_offset_d1 = (nrows+ncols) + nrows;
__m256d v_coord1_d0 = _mm256_set1_pd(z[i]);
__m256d v_coord1_d1 = _mm256_set1_pd(z[(nrows+ncols) + i]);
// Vectorized code for x86-64 processors
for (j=0; j<ncols-3; j+=4){
__m256d v_coord2_d0 = _mm256_loadu_pd(&z[z_offset_d0 + j]);
__m256d v_coord2_d1 = _mm256_loadu_pd(&z[z_offset_d1 + j]);
__m256d v_tmp0 = _mm256_sub_pd(v_coord1_d0, v_coord2_d0);
__m256d v_tmp1 = _mm256_sub_pd(v_coord1_d1, v_coord2_d1);
v_tmp0 = _mm256_mul_pd(v_tmp0, v_tmp0);
v_tmp1 = _mm256_mul_pd(v_tmp1, v_tmp1);
__m256d v_res = _mm256_sqrt_pd(_mm256_sub_pd(v_tmp0, v_tmp1));
_mm256_storeu_pd(&dist[i*ncols + j], v_res);
}
// Remainder
for (; j<ncols; j++){
double coord1_d0 = z[i];
double coord1_d1 = z[(nrows+ncols) + i];
double coord2_d0 = z[nrows + j];
double coord2_d1 = z[(nrows+ncols) + nrows + j];
double tmp = (coord1_d0-coord2_d0) * (coord1_d0-coord2_d0);
tmp += (coord1_d1-coord2_d1) * (coord1_d1-coord2_d1);
dist[i*ncols+j] = sqrt(tmp);
}
}
}
else{
// General slow implementation
for (int i=0; i<nrows; i++){
for (int j=0; j<ncols; j++){
dist[i*ncols+j] = 0;
for (int d=0; d<dims; d++){
double coord1 = z[d*(nrows+ncols) + i];
double coord2 = z[d*(nrows+ncols) + nrows + j];
dist[i*ncols+j] += (coord1-coord2) * (coord1-coord2);
}
dist[i*ncols+j] = sqrt(dist[i*ncols+j]);
}
}
}
return dist;
}
double stress(double* z, double* target_distances, int* distance_types, double step, int nrows, int ncols, int dim){
double stress = 0.0;
double err = 0.0;
double* map_distances = dist_pairs(z, nrows, ncols, dim);
double step_decay = decay(step);
for (int i=0; i<nrows; i++){
for (int j=0; j<ncols; j++){
if (distance_types[i*ncols+j]==0 || distance_types[i*ncols+j]==3){
double tmp = target_distances[i*ncols+j] - map_distances[i*ncols+j];
stress += tmp * tmp;
}
else if (distance_types[i*ncols+j]==2){
err = target_distances[i*ncols+j] - map_distances[i*ncols+j] + step;
stress += step * step * step_decay;
}
}
}
(void)err; // Avoid a compiler warning
free(map_distances);
return stress;
}
import cython
cimport cython
from libc.stdlib cimport malloc, free
cdef extern from "ctools.h":
double stress (double*, double*, int*, double, int, int, int)
# Assume Numpy arrays are contiguous
@cython.boundscheck(False)
@cython.wraparound(False)
@cython.nonecheck(False)
def stress_cython(double[::1] z, double[:,::1] target_distances, int [:,::1]
distance_types, double step, int dim, int nrows, int ncols):
cdef int i
cdef int j
cdef int N = (nrows+ncols)*dim
stress_val = stress(&z[0], &target_distances[0,0], &distance_types[0,0], step, nrows, ncols, dim)
return stress_val
这是结果时间(带-O3 -mavx -mfma
:
cython:1.29 python:4.45
请注意,dist_pairs
花了 60% 的时间计算平方根。其余的基本上是从内存加载/存储。加快平方根计算速度的唯一方法是 1。使用近似值或使用不太精确的数字(即。float
) 或 2。使用多个线程。
您可以通过合并dist_pairs
和stress
的循环以及逐行操作(以交错方式)来减少整体计算的内存限制。计算的行可以存储在快速 CPU 缓存中,而不是存储在较慢的主 RAM 中。生成的代码应受计算约束,并且可以很好地扩展。
如果这还不够,您可以尝试在能够有效计算双精度运算的服务器端GPU 上运行它,因为平方根的计算速度应该更快,并且设备内存通常也更快。