使用 SIMD (AVX2) 进行稀疏阵列压缩



我有一个稀疏数组a(主要是零):

unsigned char a[1000000]; 

我想使用采用 AVX2 的英特尔 x64 架构上的 SIMD 指令,创建一个a b非零元素的索引数组。我正在寻找如何有效地做到这一点的技巧。具体来说,是否有 SIMD 指令来获取 SIMD 寄存器中连续排列的非零元素的位置?

计算非零指数的五种方法是:

  • 半矢量化循环:使用字符加载 SIMD 向量,与零进行比较并应用移动掩码。如果任何字符不为零,请使用小标量循环(也是@stgatilov建议的)。这适用于非常稀疏的数组。以下代码中的函数arr2ind_movmsk使用 BMI1 指令对于标量循环。

  • 矢量化循环:英特尔 Haswell 处理器和更新版本支持 BMI1 和 BMI2 指令集。BMI2 包含pext指令(并行位提取,见维基百科链接),事实证明,这在这里很有用。请参阅下面代码中的arr2ind_pext

  • 带有 if 语句的经典标量循环:arr2ind_if .

  • 没有分支的标量循环:arr2ind_cmov .

  • 查找表:@stgatilov显示可以使用查找表代替 pdep 和其他整数指示。这可能很好用,但是,查找表非常大:它不适合 L1 缓存。此处未测试。另请参阅此处的讨论。


/* 
gcc -O3 -Wall -m64 -mavx2 -fopenmp -march=broadwell -std=c99 -falign-loops=16 sprs_char2ind.c
example: Test different methods with an array a of size 20000 and approximate 25/1024*100%=2.4% nonzeros:   
              ./a.out 20000 25
*/
#include <stdio.h>
#include <immintrin.h>
#include <stdint.h>
#include <omp.h> 
#include <string.h>

__attribute__ ((noinline)) int arr2ind_movmsk(const unsigned char * restrict a, int n, int * restrict ind, int * m){
   int i, m0, k;
   __m256i msk;
   m0=0;
   for (i=0;i<n;i=i+32){                              /* Load 32 bytes and compare with zero:           */
      msk=_mm256_cmpeq_epi8(_mm256_load_si256((__m256i *)&a[i]),_mm256_setzero_si256());
      k=_mm256_movemask_epi8(msk);
      k=~k;                                           /* Search for nonzero bits instead of zero bits.  */
      while (k){
         ind[m0]=i+_tzcnt_u32(k);                     /* Count the number of trailing zero bits in k.   */
         m0++;
         k=_blsr_u32(k);                              /* Clear the lowest set bit in k.                 */
      }
   }
   *m=m0;
   return 0;
}

__attribute__ ((noinline)) int arr2ind_pext(const unsigned char * restrict a, int n, int * restrict ind, int * m){
   int i, m0;
   uint64_t     cntr_const = 0xFEDCBA9876543210;
   __m256i      shft       = _mm256_set_epi64x(0x04,0x00,0x04,0x00);
   __m256i      vmsk       = _mm256_set1_epi8(0x0F);
   __m256i      cnst16     = _mm256_set1_epi32(16);
   __m256i      shf_lo     = _mm256_set_epi8(0x80,0x80,0x80,0x0B,   0x80,0x80,0x80,0x03,   0x80,0x80,0x80,0x0A,   0x80,0x80,0x80,0x02,
                                             0x80,0x80,0x80,0x09,   0x80,0x80,0x80,0x01,   0x80,0x80,0x80,0x08,   0x80,0x80,0x80,0x00);
   __m256i      shf_hi     = _mm256_set_epi8(0x80,0x80,0x80,0x0F,   0x80,0x80,0x80,0x07,   0x80,0x80,0x80,0x0E,   0x80,0x80,0x80,0x06,
                                             0x80,0x80,0x80,0x0D,   0x80,0x80,0x80,0x05,   0x80,0x80,0x80,0x0C,   0x80,0x80,0x80,0x04);
   __m128i      pshufbcnst = _mm_set_epi8(0x80,0x80,0x80,0x80,0x80,0x80,0x80,0x80,  0x0E,0x0C,0x0A,0x08,0x06,0x04,0x02,0x00);                                            
   __m256i      i_vec      = _mm256_setzero_si256();
   m0=0;
   for (i=0;i<n;i=i+16){
      __m128i   v          = _mm_load_si128((__m128i *)&a[i]);                     /* Load 16 bytes.                                                                               */
      __m128i   msk        = _mm_cmpeq_epi8(v,_mm_setzero_si128());                /* Generate 16x8 bit mask.                                                                      */
                msk        = _mm_srli_epi64(msk,4);                                /* Pack 16x8 bit mask to 16x4 bit mask.                                                         */
                msk        = _mm_shuffle_epi8(msk,pshufbcnst);                     /* Pack 16x8 bit mask to 16x4 bit mask.                                                         */
                msk        = _mm_xor_si128(msk,_mm_set1_epi32(-1));                /* Invert 16x4 mask.                                                                            */
      uint64_t  msk64      = _mm_cvtsi128_si64x(msk);                              /* _mm_popcnt_u64 and _pext_u64 work on 64-bit general-purpose registers, not on simd registers.*/
      int       p          = _mm_popcnt_u64(msk64)>>2;                             /* p is the number of nonzeros in 16 bytes of a.                                                */
      uint64_t  cntr       = _pext_u64(cntr_const,msk64);                          /* parallel bits extract. cntr contains p 4-bit integers. The 16 4-bit integers in cntr_const are shuffled to the p 4-bit integers that we want */
                                                                                   /* The next 7 intrinsics unpack these p 4-bit integers to p 32-bit integers.                    */  
      __m256i   cntr256    = _mm256_set1_epi64x(cntr);
                cntr256    = _mm256_srlv_epi64(cntr256,shft);
                cntr256    = _mm256_and_si256(cntr256,vmsk);
      __m256i   cntr256_lo = _mm256_shuffle_epi8(cntr256,shf_lo);
      __m256i   cntr256_hi = _mm256_shuffle_epi8(cntr256,shf_hi);
                cntr256_lo = _mm256_add_epi32(i_vec,cntr256_lo);
                cntr256_hi = _mm256_add_epi32(i_vec,cntr256_hi);
                             _mm256_storeu_si256((__m256i *)&ind[m0],cntr256_lo);     /* Note that the stores of iteration i and i+16 may overlap.                                                         */
                             _mm256_storeu_si256((__m256i *)&ind[m0+8],cntr256_hi);   /* Array ind has to be large enough to avoid segfaults. At most 16 integers are written more than strictly necessary */ 
                m0         = m0+p;
                i_vec      = _mm256_add_epi32(i_vec,cnst16);
   }
   *m=m0;
   return 0;
}

__attribute__ ((noinline)) int arr2ind_if(const unsigned char * restrict a, int n, int * restrict ind, int * m){
   int i, m0;
   m0=0;
   for (i=0;i<n;i++){
      if (a[i]!=0){
         ind[m0]=i;
         m0=m0+1;
      }
   }
   *m=m0;
   return 0;
}

__attribute__((noinline)) int arr2ind_cmov(const unsigned char * restrict a, int n, int * restrict ind, int * m){
   int i, m0;
   m0=0;
   for (i=0;i<n;i++){
      ind[m0]=i;
      m0=(a[i]==0)? m0 : m0+1;   /* Compiles to cmov instruction. */
   }
   *m=m0;
   return 0;
}

__attribute__ ((noinline)) int print_nonz(const unsigned char * restrict a, const int * restrict ind, const int m){
   int i;
   for (i=0;i<m;i++) printf("i=%d,  ind[i]=%d   a[ind[i]]=%un",i,ind[i],a[ind[i]]);
   printf("n");   fflush( stdout );
   return 0;
}

__attribute__ ((noinline)) int print_chk(const unsigned char * restrict a, const int * restrict ind, const int m){
   int i;                              /* Compute a hash to compare the results of different methods. */
   unsigned int chk=0;
   for (i=0;i<m;i++){
      chk=((chk<<1)|(chk>>31))^(ind[i]);
   }
   printf("chk = %10Xn",chk);
   return 0;
}

int main(int argc, char **argv){
int n, i, m; 
unsigned int j, k, d;
unsigned char *a;
int *ind;
double t0,t1;
int meth, nrep;
char txt[30];
sscanf(argv[1],"%d",&n);            /* Length of array a.                                    */
n=n>>5;                             /* Adjust n to a multiple of 32.                         */
n=n<<5;
sscanf(argv[2],"%u",&d);            /* The approximate fraction of nonzeros in a is: d/1024  */
printf("n=%d,   d=%un",n,d);
a=_mm_malloc(n*sizeof(char),32);
ind=_mm_malloc(n*sizeof(int),32);    
                                    /* Generate a pseudo random array a.                     */
j=73659343;                   
for (i=0;i<n;i++){
   j=j*653+1;
   k=(j & 0x3FF00)>>8;              /* k is a pseudo random number between 0 and 1023        */
   if (k<d){
      a[i] = (j&0xFE)+1;            /* Set a[i] to nonzero.                                  */
   }else{
      a[i] = 0;
   }
}
/* for (i=0;i<n;i++){if (a[i]!=0){printf("i=%d,   a[i]=%un",i,a[i]);}} printf("n");   */ /* Uncomment this line to print the nonzeros in a. */ 
char txt0[]="arr2ind_movmsk: ";
char txt1[]="arr2ind_pext:   ";
char txt2[]="arr2ind_if:     ";
char txt3[]="arr2ind_cmov:   ";
nrep=10000;                                   /* Repeat a function nrep times to make relatively accurate timings possible.                          */
                                              /* With nrep=1000000:   ./a.out 10016 4 ;   ./a.out 10016 48 ;   ./a.out 10016 519                    */
                                              /* With nrep=10000:     ./a.out 1000000 5 ; ./a.out 1000000 52 ; ./a.out 1000000 513                  */
printf("nrep = %d  nn",nrep);
arr2ind_movmsk(a,n,ind,&m);                   /* Make sure that the arrays a and ind are read and/or written at least one time before benchmarking. */
for (meth=0;meth<4;meth++){
   t0=omp_get_wtime();
   switch (meth){
      case 0: for(i=0;i<nrep;i++) arr2ind_movmsk(a,n,ind,&m);         strcpy(txt,txt0); break;
      case 1: for(i=0;i<nrep;i++) arr2ind_pext(a,n,ind,&m);           strcpy(txt,txt1); break;
      case 2: for(i=0;i<nrep;i++) arr2ind_if(a,n,ind,&m);             strcpy(txt,txt2); break;
      case 3: for(i=0;i<nrep;i++) arr2ind_cmov(a,n,ind,&m);           strcpy(txt,txt3); break;
      default: ;
   }
   t1=omp_get_wtime();
   printf("method = %s  ",txt);
   /* print_chk(a,ind,m);  */
   printf(" elapsed time = %6.2fn",t1-t0);
}
print_nonz(a, ind, 2);                                            /* Do something with the results                 */
printf("density = %f %% nn",((double)m)/((double)n)*100);       /* Actual nonzero density of array a.            */
/* print_nonz(a, ind, m);    */  /* Uncomment this line to print the indices of the nonzeros.                      */
return 0;
}
/*
With nrep=1000000:
 ./a.out 10016 4 ;    ./a.out 10016 4 ;    ./a.out 10016 48 ;    ./a.out 10016 48 ;    ./a.out 10016 519  ;    ./a.out 10016 519       
With nrep=10000:  
 ./a.out 1000000 5 ;  ./a.out 1000000 5 ;  ./a.out 1000000 52 ;  ./a.out 1000000 52 ;  ./a.out 1000000 513 ;   ./a.out 1000000 513     
*/


代码以 n=10016

(数据适合 L1 缓存)和 n=1000000 的数组大小进行了测试,其中不同的非零密度约为0.5%、5%和50%。为了准确计时,函数称为 1000000和 10000 倍。


Time in seconds, size n=10016, 1e6 function calls. Intel core i5-6500
                     0.53%        5.1%       50.0%
arr2ind_movmsk:       0.27        0.53        4.89
arr2ind_pext:         1.44        1.59        1.45
arr2ind_if:           5.93        8.95       33.82
arr2ind_cmov:         6.82        6.83        6.82
Time in seconds, size n=1000000, 1e4 function calls.
                     0.49%        5.1%       50.1%
arr2ind_movmsk:       0.57        2.03        5.37
arr2ind_pext:         1.47        1.47        1.46
arr2ind_if:           5.88        8.98       38.59
arr2ind_cmov:         6.82        6.81        6.81


在这些示例中,矢量化循环比标量循环更快。arr2ind_movmsk的性能很大程度上取决于a的密度。它只是如果密度足够小,则比arr2ind_pext快。盈亏平衡点还取决于数组大小n。函数"arr2ind_if"显然在 50% 非零密度下分支预测失败。

如果您预计非零元素的数量非常低(即远小于 1%),那么您可以简单地检查每个 16 字节块是否为非零:

    int mask = _mm_movemask_epi8(_mm_cmpeq_epi8(reg, _mm_setzero_si128());
    if (mask != 65535) {
        //store zero bits of mask with scalar code
    }

如果好元素的百分比足够小,那么错误预测分支的成本和"if"中慢速标量代码的成本可以忽略不计。


对于一个好的通用解决方案,首先考虑流压缩的 SSE 实现。它从字节数组中删除所有零元素(想法取自这里):

__m128i shuf [65536]; //must be precomputed
char    cnt  [65536]; //must be precomputed
int compress(const char *src, int len, char *dst) {
    char *ptr = dst;
    for (int i = 0; i < len; i += 16) {
        __m128i reg = _mm_load_si128((__m128i*)&src[i]);
        __m128i zeroMask = _mm_cmpeq_epi8(reg, _mm_setzero_si128());
        int mask = _mm_movemask_epi8(zeroMask);
        __m128i compressed = _mm_shuffle_epi8(reg, shuf[mask]);
        _mm_storeu_si128((__m128i*)ptr, compressed);
        ptr += cnt[mask];   //alternative:   ptr += 16-_mm_popcnt_u32(mask);
    }
    return ptr - dst;
}

如您所见,(_mm_shuffle_epi8+查找表)可以创造奇迹。我不知道任何其他方法可以矢量化结构复杂的代码,例如流压缩。


现在,您的请求唯一剩下的问题是您想要获取索引。每个索引必须以 4 字节值存储,因此 16 个输入字节的块可能会产生多达 64 字节的输出,这不适合单个 SSE 寄存器。

处理此问题的一种方法是诚实地将输出解压缩为 64 字节。因此,您将代码中的常量 (0,1,2,3,4,...,15) 替换reg,然后将 SSE 寄存器解压缩为 4 个寄存器,并添加一个具有 4 个i值的寄存器。这将需要更多的说明:6 个解包说明、4 个添加和 3 个商店(一个已经存在)。至于我,这是一个巨大的开销,特别是如果你期望不到25%的非零元素。

或者,您可以将单循环迭代处理的非零字节数限制为 4,以便一个寄存器始终足以用于输出。下面是示例代码:

__m128i shufMask [65536]; //must be precomputed
char    srcMove  [65536]; //must be precomputed
char    dstMove  [65536]; //must be precomputed
int compress_ids(const char *src, int len, int *dst) {
    const char *ptrSrc = src;
    int *ptrDst = dst;
    __m128i offsets = _mm_setr_epi8(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15);
    __m128i base = _mm_setzero_si128();
    while (ptrSrc < src + len) {
        __m128i reg = _mm_loadu_si128((__m128i*)ptrSrc);
        __m128i zeroMask = _mm_cmpeq_epi8(reg, _mm_setzero_si128());
        int mask = _mm_movemask_epi8(zeroMask);
        __m128i ids8 = _mm_shuffle_epi8(offsets, shufMask[mask]);
        __m128i ids32 = _mm_unpacklo_epi16(_mm_unpacklo_epi8(ids8, _mm_setzero_si128()), _mm_setzero_si128());
        ids32 = _mm_add_epi32(ids32, base);
        _mm_storeu_si128((__m128i*)ptrDst, ids32);
        ptrDst += dstMove[mask];    //alternative:   ptrDst += min(16-_mm_popcnt_u32(mask), 4);
        ptrSrc += srcMove[mask];    //no alternative without LUT
        base = _mm_add_epi32(base, _mm_set1_epi32(dstMove[mask]));
    }
    return ptrDst - dst;
}

这种方法的一个缺点是,现在每个后续循环迭代都无法启动,直到在上一次迭代上执行行ptrDst += dstMove[mask];。因此,关键路径急剧增加。硬件超线程或其手动仿真可以消除此损失。


因此,如您所见,这个基本思想有很多变体,所有这些都以不同程度的效率解决您的问题。如果您不喜欢 LUT,也可以减小它的大小(同样,以降低吞吐量性能为代价)。

这种方法不能完全扩展到更广泛的寄存器(即AVX2和AVX-512),但您可以尝试将多次连续迭代的指令组合成单个AVX2或AVX-512指令,从而略微增加吞吐量。

注意:我没有测试任何代码(因为正确预计算LUT需要明显的努力)。

虽然AVX2指令集有很多GATHER指令,但是它的性能太慢了。最有效的方法 - 手动处理数组。

最新更新