我有一个稀疏数组a
(主要是零):
unsigned char a[1000000];
我想使用采用 AVX2 的英特尔 x64 架构上的 SIMD 指令,创建一个a
b
非零元素的索引数组。我正在寻找如何有效地做到这一点的技巧。具体来说,是否有 SIMD 指令来获取 SIMD 寄存器中连续排列的非零元素的位置?
计算非零指数的五种方法是:
-
半矢量化循环:使用字符加载 SIMD 向量,与零进行比较并应用移动掩码。如果任何字符不为零,请使用小标量循环(也是@stgatilov建议的)。这适用于非常稀疏的数组。以下代码中的函数
arr2ind_movmsk
使用 BMI1 指令对于标量循环。 -
矢量化循环:英特尔 Haswell 处理器和更新版本支持 BMI1 和 BMI2 指令集。BMI2 包含
pext
指令(并行位提取,见维基百科链接),事实证明,这在这里很有用。请参阅下面代码中的arr2ind_pext
。 -
带有 if 语句的经典标量循环:
arr2ind_if
. -
没有分支的标量循环:
arr2ind_cmov
. -
查找表:@stgatilov显示可以使用查找表代替 pdep 和其他整数指示。这可能很好用,但是,查找表非常大:它不适合 L1 缓存。此处未测试。另请参阅此处的讨论。
/*
gcc -O3 -Wall -m64 -mavx2 -fopenmp -march=broadwell -std=c99 -falign-loops=16 sprs_char2ind.c
example: Test different methods with an array a of size 20000 and approximate 25/1024*100%=2.4% nonzeros:
./a.out 20000 25
*/
#include <stdio.h>
#include <immintrin.h>
#include <stdint.h>
#include <omp.h>
#include <string.h>
__attribute__ ((noinline)) int arr2ind_movmsk(const unsigned char * restrict a, int n, int * restrict ind, int * m){
int i, m0, k;
__m256i msk;
m0=0;
for (i=0;i<n;i=i+32){ /* Load 32 bytes and compare with zero: */
msk=_mm256_cmpeq_epi8(_mm256_load_si256((__m256i *)&a[i]),_mm256_setzero_si256());
k=_mm256_movemask_epi8(msk);
k=~k; /* Search for nonzero bits instead of zero bits. */
while (k){
ind[m0]=i+_tzcnt_u32(k); /* Count the number of trailing zero bits in k. */
m0++;
k=_blsr_u32(k); /* Clear the lowest set bit in k. */
}
}
*m=m0;
return 0;
}
__attribute__ ((noinline)) int arr2ind_pext(const unsigned char * restrict a, int n, int * restrict ind, int * m){
int i, m0;
uint64_t cntr_const = 0xFEDCBA9876543210;
__m256i shft = _mm256_set_epi64x(0x04,0x00,0x04,0x00);
__m256i vmsk = _mm256_set1_epi8(0x0F);
__m256i cnst16 = _mm256_set1_epi32(16);
__m256i shf_lo = _mm256_set_epi8(0x80,0x80,0x80,0x0B, 0x80,0x80,0x80,0x03, 0x80,0x80,0x80,0x0A, 0x80,0x80,0x80,0x02,
0x80,0x80,0x80,0x09, 0x80,0x80,0x80,0x01, 0x80,0x80,0x80,0x08, 0x80,0x80,0x80,0x00);
__m256i shf_hi = _mm256_set_epi8(0x80,0x80,0x80,0x0F, 0x80,0x80,0x80,0x07, 0x80,0x80,0x80,0x0E, 0x80,0x80,0x80,0x06,
0x80,0x80,0x80,0x0D, 0x80,0x80,0x80,0x05, 0x80,0x80,0x80,0x0C, 0x80,0x80,0x80,0x04);
__m128i pshufbcnst = _mm_set_epi8(0x80,0x80,0x80,0x80,0x80,0x80,0x80,0x80, 0x0E,0x0C,0x0A,0x08,0x06,0x04,0x02,0x00);
__m256i i_vec = _mm256_setzero_si256();
m0=0;
for (i=0;i<n;i=i+16){
__m128i v = _mm_load_si128((__m128i *)&a[i]); /* Load 16 bytes. */
__m128i msk = _mm_cmpeq_epi8(v,_mm_setzero_si128()); /* Generate 16x8 bit mask. */
msk = _mm_srli_epi64(msk,4); /* Pack 16x8 bit mask to 16x4 bit mask. */
msk = _mm_shuffle_epi8(msk,pshufbcnst); /* Pack 16x8 bit mask to 16x4 bit mask. */
msk = _mm_xor_si128(msk,_mm_set1_epi32(-1)); /* Invert 16x4 mask. */
uint64_t msk64 = _mm_cvtsi128_si64x(msk); /* _mm_popcnt_u64 and _pext_u64 work on 64-bit general-purpose registers, not on simd registers.*/
int p = _mm_popcnt_u64(msk64)>>2; /* p is the number of nonzeros in 16 bytes of a. */
uint64_t cntr = _pext_u64(cntr_const,msk64); /* parallel bits extract. cntr contains p 4-bit integers. The 16 4-bit integers in cntr_const are shuffled to the p 4-bit integers that we want */
/* The next 7 intrinsics unpack these p 4-bit integers to p 32-bit integers. */
__m256i cntr256 = _mm256_set1_epi64x(cntr);
cntr256 = _mm256_srlv_epi64(cntr256,shft);
cntr256 = _mm256_and_si256(cntr256,vmsk);
__m256i cntr256_lo = _mm256_shuffle_epi8(cntr256,shf_lo);
__m256i cntr256_hi = _mm256_shuffle_epi8(cntr256,shf_hi);
cntr256_lo = _mm256_add_epi32(i_vec,cntr256_lo);
cntr256_hi = _mm256_add_epi32(i_vec,cntr256_hi);
_mm256_storeu_si256((__m256i *)&ind[m0],cntr256_lo); /* Note that the stores of iteration i and i+16 may overlap. */
_mm256_storeu_si256((__m256i *)&ind[m0+8],cntr256_hi); /* Array ind has to be large enough to avoid segfaults. At most 16 integers are written more than strictly necessary */
m0 = m0+p;
i_vec = _mm256_add_epi32(i_vec,cnst16);
}
*m=m0;
return 0;
}
__attribute__ ((noinline)) int arr2ind_if(const unsigned char * restrict a, int n, int * restrict ind, int * m){
int i, m0;
m0=0;
for (i=0;i<n;i++){
if (a[i]!=0){
ind[m0]=i;
m0=m0+1;
}
}
*m=m0;
return 0;
}
__attribute__((noinline)) int arr2ind_cmov(const unsigned char * restrict a, int n, int * restrict ind, int * m){
int i, m0;
m0=0;
for (i=0;i<n;i++){
ind[m0]=i;
m0=(a[i]==0)? m0 : m0+1; /* Compiles to cmov instruction. */
}
*m=m0;
return 0;
}
__attribute__ ((noinline)) int print_nonz(const unsigned char * restrict a, const int * restrict ind, const int m){
int i;
for (i=0;i<m;i++) printf("i=%d, ind[i]=%d a[ind[i]]=%un",i,ind[i],a[ind[i]]);
printf("n"); fflush( stdout );
return 0;
}
__attribute__ ((noinline)) int print_chk(const unsigned char * restrict a, const int * restrict ind, const int m){
int i; /* Compute a hash to compare the results of different methods. */
unsigned int chk=0;
for (i=0;i<m;i++){
chk=((chk<<1)|(chk>>31))^(ind[i]);
}
printf("chk = %10Xn",chk);
return 0;
}
int main(int argc, char **argv){
int n, i, m;
unsigned int j, k, d;
unsigned char *a;
int *ind;
double t0,t1;
int meth, nrep;
char txt[30];
sscanf(argv[1],"%d",&n); /* Length of array a. */
n=n>>5; /* Adjust n to a multiple of 32. */
n=n<<5;
sscanf(argv[2],"%u",&d); /* The approximate fraction of nonzeros in a is: d/1024 */
printf("n=%d, d=%un",n,d);
a=_mm_malloc(n*sizeof(char),32);
ind=_mm_malloc(n*sizeof(int),32);
/* Generate a pseudo random array a. */
j=73659343;
for (i=0;i<n;i++){
j=j*653+1;
k=(j & 0x3FF00)>>8; /* k is a pseudo random number between 0 and 1023 */
if (k<d){
a[i] = (j&0xFE)+1; /* Set a[i] to nonzero. */
}else{
a[i] = 0;
}
}
/* for (i=0;i<n;i++){if (a[i]!=0){printf("i=%d, a[i]=%un",i,a[i]);}} printf("n"); */ /* Uncomment this line to print the nonzeros in a. */
char txt0[]="arr2ind_movmsk: ";
char txt1[]="arr2ind_pext: ";
char txt2[]="arr2ind_if: ";
char txt3[]="arr2ind_cmov: ";
nrep=10000; /* Repeat a function nrep times to make relatively accurate timings possible. */
/* With nrep=1000000: ./a.out 10016 4 ; ./a.out 10016 48 ; ./a.out 10016 519 */
/* With nrep=10000: ./a.out 1000000 5 ; ./a.out 1000000 52 ; ./a.out 1000000 513 */
printf("nrep = %d nn",nrep);
arr2ind_movmsk(a,n,ind,&m); /* Make sure that the arrays a and ind are read and/or written at least one time before benchmarking. */
for (meth=0;meth<4;meth++){
t0=omp_get_wtime();
switch (meth){
case 0: for(i=0;i<nrep;i++) arr2ind_movmsk(a,n,ind,&m); strcpy(txt,txt0); break;
case 1: for(i=0;i<nrep;i++) arr2ind_pext(a,n,ind,&m); strcpy(txt,txt1); break;
case 2: for(i=0;i<nrep;i++) arr2ind_if(a,n,ind,&m); strcpy(txt,txt2); break;
case 3: for(i=0;i<nrep;i++) arr2ind_cmov(a,n,ind,&m); strcpy(txt,txt3); break;
default: ;
}
t1=omp_get_wtime();
printf("method = %s ",txt);
/* print_chk(a,ind,m); */
printf(" elapsed time = %6.2fn",t1-t0);
}
print_nonz(a, ind, 2); /* Do something with the results */
printf("density = %f %% nn",((double)m)/((double)n)*100); /* Actual nonzero density of array a. */
/* print_nonz(a, ind, m); */ /* Uncomment this line to print the indices of the nonzeros. */
return 0;
}
/*
With nrep=1000000:
./a.out 10016 4 ; ./a.out 10016 4 ; ./a.out 10016 48 ; ./a.out 10016 48 ; ./a.out 10016 519 ; ./a.out 10016 519
With nrep=10000:
./a.out 1000000 5 ; ./a.out 1000000 5 ; ./a.out 1000000 52 ; ./a.out 1000000 52 ; ./a.out 1000000 513 ; ./a.out 1000000 513
*/
(数据适合 L1 缓存)和 n=1000000 的数组大小进行了测试,其中不同的非零密度约为0.5%、5%和50%。为了准确计时,函数称为 1000000和 10000 倍。
Time in seconds, size n=10016, 1e6 function calls. Intel core i5-6500
0.53% 5.1% 50.0%
arr2ind_movmsk: 0.27 0.53 4.89
arr2ind_pext: 1.44 1.59 1.45
arr2ind_if: 5.93 8.95 33.82
arr2ind_cmov: 6.82 6.83 6.82
Time in seconds, size n=1000000, 1e4 function calls.
0.49% 5.1% 50.1%
arr2ind_movmsk: 0.57 2.03 5.37
arr2ind_pext: 1.47 1.47 1.46
arr2ind_if: 5.88 8.98 38.59
arr2ind_cmov: 6.82 6.81 6.81
在这些示例中,矢量化循环比标量循环更快。arr2ind_movmsk
的性能很大程度上取决于a
的密度。它只是如果密度足够小,则比arr2ind_pext
快。盈亏平衡点还取决于数组大小n
。函数"arr2ind_if"显然在 50% 非零密度下分支预测失败。
如果您预计非零元素的数量非常低(即远小于 1%),那么您可以简单地检查每个 16 字节块是否为非零:
int mask = _mm_movemask_epi8(_mm_cmpeq_epi8(reg, _mm_setzero_si128());
if (mask != 65535) {
//store zero bits of mask with scalar code
}
如果好元素的百分比足够小,那么错误预测分支的成本和"if"中慢速标量代码的成本可以忽略不计。
对于一个好的通用解决方案,首先考虑流压缩的 SSE 实现。它从字节数组中删除所有零元素(想法取自这里):
__m128i shuf [65536]; //must be precomputed
char cnt [65536]; //must be precomputed
int compress(const char *src, int len, char *dst) {
char *ptr = dst;
for (int i = 0; i < len; i += 16) {
__m128i reg = _mm_load_si128((__m128i*)&src[i]);
__m128i zeroMask = _mm_cmpeq_epi8(reg, _mm_setzero_si128());
int mask = _mm_movemask_epi8(zeroMask);
__m128i compressed = _mm_shuffle_epi8(reg, shuf[mask]);
_mm_storeu_si128((__m128i*)ptr, compressed);
ptr += cnt[mask]; //alternative: ptr += 16-_mm_popcnt_u32(mask);
}
return ptr - dst;
}
如您所见,(_mm_shuffle_epi8
+查找表)可以创造奇迹。我不知道任何其他方法可以矢量化结构复杂的代码,例如流压缩。
现在,您的请求唯一剩下的问题是您想要获取索引。每个索引必须以 4 字节值存储,因此 16 个输入字节的块可能会产生多达 64 字节的输出,这不适合单个 SSE 寄存器。
处理此问题的一种方法是诚实地将输出解压缩为 64 字节。因此,您将代码中的常量 (0,1,2,3,4,...,15) 替换reg
,然后将 SSE 寄存器解压缩为 4 个寄存器,并添加一个具有 4 个i
值的寄存器。这将需要更多的说明:6 个解包说明、4 个添加和 3 个商店(一个已经存在)。至于我,这是一个巨大的开销,特别是如果你期望不到25%的非零元素。
或者,您可以将单循环迭代处理的非零字节数限制为 4,以便一个寄存器始终足以用于输出。下面是示例代码:
__m128i shufMask [65536]; //must be precomputed
char srcMove [65536]; //must be precomputed
char dstMove [65536]; //must be precomputed
int compress_ids(const char *src, int len, int *dst) {
const char *ptrSrc = src;
int *ptrDst = dst;
__m128i offsets = _mm_setr_epi8(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15);
__m128i base = _mm_setzero_si128();
while (ptrSrc < src + len) {
__m128i reg = _mm_loadu_si128((__m128i*)ptrSrc);
__m128i zeroMask = _mm_cmpeq_epi8(reg, _mm_setzero_si128());
int mask = _mm_movemask_epi8(zeroMask);
__m128i ids8 = _mm_shuffle_epi8(offsets, shufMask[mask]);
__m128i ids32 = _mm_unpacklo_epi16(_mm_unpacklo_epi8(ids8, _mm_setzero_si128()), _mm_setzero_si128());
ids32 = _mm_add_epi32(ids32, base);
_mm_storeu_si128((__m128i*)ptrDst, ids32);
ptrDst += dstMove[mask]; //alternative: ptrDst += min(16-_mm_popcnt_u32(mask), 4);
ptrSrc += srcMove[mask]; //no alternative without LUT
base = _mm_add_epi32(base, _mm_set1_epi32(dstMove[mask]));
}
return ptrDst - dst;
}
这种方法的一个缺点是,现在每个后续循环迭代都无法启动,直到在上一次迭代上执行行ptrDst += dstMove[mask];
。因此,关键路径急剧增加。硬件超线程或其手动仿真可以消除此损失。
因此,如您所见,这个基本思想有很多变体,所有这些都以不同程度的效率解决您的问题。如果您不喜欢 LUT,也可以减小它的大小(同样,以降低吞吐量性能为代价)。
这种方法不能完全扩展到更广泛的寄存器(即AVX2和AVX-512),但您可以尝试将多次连续迭代的指令组合成单个AVX2或AVX-512指令,从而略微增加吞吐量。
注意:我没有测试任何代码(因为正确预计算LUT需要明显的努力)。
虽然AVX2指令集有很多GATHER指令,但是它的性能太慢了。最有效的方法 - 手动处理数组。