C-如何从AVX Intinsics中获得性能提高以计算基本统计



我的问题是关于使用AVX指令与幼稚的方法的性能。

我从天真的方法中得到我的AVX方法的答案,并正确地回答,但是使用AVX的说明需要更长的时间,所以我想知道我在做什么错/效率低下的矢量代码。

这个问题太复杂了,无法提供一个独立的编译单元,我为此感到抱歉。但是,我希望在下面有功能代码片段,我希望这是相当简单且风格的,希望很容易遵循手头的问题。

一些环境详细信息:

  • 这两种方法均与Clang(Apple LLVM版本8.1.0(Clang-802.0.42(一起编译(。
  • 我正在用-mavx标志编译。
  • 我的硬件(MacBook Pro带有Intel Core i7处理器(声称支持AVX说明。

我有一个程序,其中用户提供了一个多行文本文件,每行包含一个逗号分隔的数字字符串,即 n 二维向量的列表,其中 n 对于文件是任意的,但是(禁止输入不良输入(是每行相同的值 n

例如:

0,4,6,1,2,22,0,2,30,...,39,14,0,3,3,3,1,3,0,3,2,1
0,0,1,1,0,0,0,8,0,1,...,6,0,0,4,0,0,0,0,7,0,8,2,0
...
1,0,1,0,1,0,0,2,0,1,...,2,0,0,0,0,0,2,1,1,0,2,0,0

i从这些向量的比较中产生了一些统计得分,例如皮尔逊相关性,但是得分函数可以是简单的,例如算术均值。

幼稚的方法

这些向量中的每个向量都被放入指向 signal_t的结构的指针中:

typedef struct signal {
    uint32_t n;
    score_t* data;
    score_t mean;
} signal_t;

score_t类型只是 float的类型:

typedef float score_t;

开始,我将字符串解析为float(score_t(值,并计算算术平均值:

signal_t* s = NULL;
s = malloc(sizeof(signal_t));
if (!s) {
    fprintf(stderr, "Error: Could not allocate space for signal pointer!n");
    exit(EXIT_FAILURE);
}
s->n = 1;
s->data = NULL;
s->mean = NAN;
for (uint32_t idx = 0; idx < strlen(vector_string); idx++) {
    if (vector_string[idx] == ',') {
        s->n++;
    }
}
s->data = malloc(sizeof(*s->data) * s->n);
if (!s->data) {
    fprintf(stderr, "Error: Could not allocate space for signal data pointer!n");
    exit(EXIT_FAILURE);
}
char* start = vector_string;
char* end = vector_string;
char entry_buf[ENTRY_MAX_LEN];
uint32_t entry_idx = 0;
bool finished_parsing = false;
bool data_contains_nan = false;
do {
    end = strchr(start, ',');
    if (!end) {
        end = vector_string + strlen(vector_string);
        finished_parsing = true;
    }
    memcpy(entry_buf, start, end - start);
    entry_buf[end - start] = '';
    sscanf(entry_buf, "%f", &s->data[entry_idx++]);
    if (isnan(s->data[entry_idx - 1])) {
        data_contains_nan = true;
    }
    start = end + 1;
} while (!finished_parsing);
if (!data_contains_nan) {
    s->mean = pt_mean_signal(s->data, s->n);
}

算术平均值非常简单:

score_t pt_mean_signal(score_t* d, uint32_t len)
{
    score_t s = 0.0f;
    for (uint32_t idx = 0; idx < len; idx++) {
        s += d[idx];
    }
    return s / len;
}

幼稚的性能

在10K矢量串文件上运行这种方法时,我的运行时间为6.58。

AVX方法

我有一个修改的signal_t结构,称为signal_avx_t

typedef struct signal_avx {
    uint32_t n_raw;
    uint32_t n;
    __m256* data;
    score_t mean;
} signal_avx_t;

这将指针存储到__m256地址。每个__m256存储八个单精度float值。为了方便起见,我定义了一个称为AVX_FLOAT_N的常数来存储此倍数,例如:

#define AVX_FLOAT_N 8

这是我如何解析矢量串并将其存储在__m256中。它与幼稚的方法非常相似,除非现在我一次将八个值读为缓冲区,将缓冲区写入__m256,然后重复直到没有更多的值来编写。然后,我计算平均值:

signal_avx_t* s = NULL;
s = malloc(sizeof(signal_avx_t));
if (!s) {
fprintf(stderr, "Error: Could not allocate space for signal_avx pointer!n");
    exit(EXIT_FAILURE);
}
s->n_raw = 1;
s->n = 0;
s->data = NULL;
s->mean = NAN;
for (uint32_t idx = 0; idx < strlen(vector_string); idx++) {
    if (vector_string[idx] == ',') {
        s->n_raw++;
    }
}
score_t signal_buf[AVX_FLOAT_N];
s->n = (uint32_t) ceil((float)(s->n_raw) / AVX_FLOAT_N);
s->data = malloc(sizeof(*s->data) * s->n);
if (!s->data) {
    fprintf(stderr, "Error: Could not allocate space for signal_avx data pointer!n");
    exit(EXIT_FAILURE);
}
char* start = id;
char* end = id;
char entry_buf[ENTRY_MAX_LEN];
uint32_t entry_idx = 0;
uint32_t data_idx = 0;
bool finished_parsing = false;
bool data_contains_nan = false;
do {
    end = strchr(start, ',');
    if (!end) {
        end = vector_string + strlen(vector_string);
        finished_parsing = true;
    }
    memcpy(entry_buf, start, end - start);
    entry_buf[end - start] = '';
    sscanf(entry_buf, "%f", &signal_buf[entry_idx++ % AVX_FLOAT_N]);
    if (isnan(signal_buf[(entry_idx - 1) % AVX_FLOAT_N])) {
        data_contains_nan = true;
    }
    start = end + 1;
    /* I write every eight floats to an __m256 chunk of memory */
    if (entry_idx % AVX_FLOAT_N == 0) {
        s->data[data_idx++] = _mm256_setr_ps(signal_buf[0],
                                             signal_buf[1],
                                             signal_buf[2],
                                             signal_buf[3],
                                             signal_buf[4],
                                             signal_buf[5],
                                             signal_buf[6],
                                             signal_buf[7]);
    }
} while (!finished_parsing);
if (!data_contains_nan) {
    /* write any leftover floats to the last `__m256` */
    if (entry_idx % AVX_FLOAT_N != 0) {
        for (uint32_t idx = entry_idx % AVX_FLOAT_N; idx < AVX_FLOAT_N; idx++) {
            signal_buf[idx] = 0;
        }
        s->data[data_idx++] = _mm256_setr_ps(signal_buf[0],
                                             signal_buf[1],
                                             signal_buf[2],
                                             signal_buf[3],
                                             signal_buf[4],
                                             signal_buf[5],
                                             signal_buf[6],
                                             signal_buf[7]);
    }
    s->mean = pt_mean_signal_avx(s->data, s->n, s->n_raw);
}

AVX平均功能

这是我写的函数,以生成算术平均值:

score_t pt_mean_signal_avx(__m256* d, uint32_t len, uint32_t len_raw)
{
    score_t s = 0.0f;
    /* initialize a zero-value vector to collect summed value */
    __m256 v_sum = _mm256_setzero_ps();
    /* add data to collector */
    for (uint32_t idx = 0; idx < len; idx++) {
        v_sum = _mm256_add_ps(v_sum, d[idx]);
    }
    /* sum the collector values */
    score_t* res = (score_t*)&v_sum;
    for (uint32_t idx = 0; idx < AVX_FLOAT_N; idx++) {
        s += res[idx];
    }
    return s / len_raw;
}

AVX性能

在10K矢量串文件上运行基于AVX的方法时,我的运行时间为6.86,速度慢了约5%。无论输入的大小如何,这种差异大致是恒定的。

摘要

我的期望是,通过使用AVX说明并通过矢量化循环,我会感到速度颠簸,并不是说性能会更糟。

代码段中是否有任何建议滥用__m256数据类型和相关的固有函数,以计算基本摘要统计信息?

主要是,我想在这里弄清楚我在做什么错,然后再发展较大的数据集之间的更复杂的评分功能。感谢您的任何建设性建议!

首先,我希望我们同意,将文本放置在浮子上可能比算术均值要多得多,甚至没有提及从物理上的文件中读取数据的数据贮存。如果您要进行基准测试,则绝对应该省略阅读和解析。

这里似乎主要的问题是您在阅读时要成为并进行矢量化。实际上,您正在做的是从signal_bufs的数据的不必要副本。

您必须意识到__mm256_*实际上不是内存数据类型。这只是一个宏,以确保您使用256bit值的内存地址和寄存器。

因此,只需将您的signal_buf__mm256_load_ps加载到SIMD寄存器中,然后在其上进行AVX魔术,或者顺序填充s sscanf,然后进行相同的__mm256_load_ps魔术。

我真的不明白您为什么使用setr。为什么需要扭转算术平均值的要素的顺序?还是这是您的"穷人的负载指导"?

再次,您的浮点数学工作,尤其是如果您编写编译器甚至可以自动矢量化的代码,这不是在这里花费时间。这是弦的解析。

volk(矢量优化的内核库(有很多手写的SIMD内核,其中包括累积浮子数组的一个:

https://github.com/gnuradio/volk/blob/master/kernels/kernels/volk/volk_32f_accumulator_s32f.h

AVX代码看起来像这样:

static inline void
volk_32f_accumulator_s32f_a_avx(float* result, const float* inputBuffer, unsigned int num_points)
{
  float returnValue = 0;
  unsigned int number = 0;
  const unsigned int eighthPoints = num_points / 8;
  const float* aPtr = inputBuffer;
  __VOLK_ATTR_ALIGNED(32) float tempBuffer[8];
  __m256 accumulator = _mm256_setzero_ps();
  __m256 aVal = _mm256_setzero_ps();
  for(;number < eighthPoints; number++){
    aVal = _mm256_load_ps(aPtr);
    accumulator = _mm256_add_ps(accumulator, aVal);
    aPtr += 8;
  }
  _mm256_store_ps(tempBuffer, accumulator);
  returnValue = tempBuffer[0];
  returnValue += tempBuffer[1];
  returnValue += tempBuffer[2];
  returnValue += tempBuffer[3];
  returnValue += tempBuffer[4];
  returnValue += tempBuffer[5];
  returnValue += tempBuffer[6];
  returnValue += tempBuffer[7];
  number = eighthPoints * 8;
  for(;number < num_points; number++){
    returnValue += (*aPtr++);
  }
  *result = returnValue;
}

它的作用是拥有一个八个元素累加器,它不断地添加八个新元素的集合(单独(,然后最终返回有关这八个蓄能器的总和。/p>

矢量零件以外的效率很多(@marcus在他的答案中解决了(。

不要为signal_t* s动态分配空间。这是一个很小的固定尺寸结构,您只需要其中一个,因此您应该只使用signal_t s(自动存储(并删除间接水平。


进行任何转换之前,最好不要扫描,的整个字符串,因为如果字符串不适合L1缓存(32K(,那么在转换时,您会在数据重用时丢失。

如果您不能只是即时概括它(例如均值(,则分配一个大缓冲区以放入转换的数据。如果您在不填充缓冲区的情况下到达字符串的末端,那就很好。realloc将其降低到大小。(您分配但从未触摸的内存页面在大多数OS上基本上都是免费的。(如果您在到达字符串结束之前填充初始缓冲区大小,请用realloc将其增长两个。(指数尺寸增加摊销至o(1(附加元素的平均成本,这就是为什么C 的std::vector使用这种方式。

如果您知道字符串的全长(例如,从文件大小(,可以使用它来估算您需要的缓冲区的大小。(例如,假设每个浮子长度为2个字节,包括',',因为它们至少会长时间,并且可以在理性的情况下分配过度分配。(


如果您真的想在转换之前对逗号进行计数,则可以将其矢量化_mm_cmpeq_epi8以在字符串数据的向量中找到逗号,而_mm_add_epi8可以将其汇总为0/-1的向量。使用_mm_sad_epu8至少每255个向量避免溢出,将8位元素的水平总和到64位元素。


如果您的数据就像您的简单示例一样,每个数字实际上都是1位整数,则可以做比使用scanf将其转换为float更好。例如您可以使用Integer Simd将ASCII数字从0到9的整数转换为整数。

// if we don't know the string length ahead of time,
// we could look for a '' on the fly with _mm256_cmpeq_epi8 / _mm256_movemask_epi
uint64_t digitstring_sum(const char*p, size_t len)
{
    const char *endp = p+len - 31;  // up to the last full-vector of string data
    __m256i sum = _mm256_setzero_si256();
    for ( ; p < endp ; p+=32 ) {
        __m256i stringdata = _mm256_loadu_si256((const __m256i*)p);
        __m256i integers = _mm256_sub_epi16(stringdata, _mm256_set1_epi16( '0'+(','<<8) ));  // turn "1,2,3,..." into 0x0100 0200 0300...
        // horizontal sum the 8-bit elements into 64-bit elements
        // There are various ways to optimize this by doing this part less frequently, but still often enough to avoid overflow.  Or doing it a different way.
        __m256i hsum = _mm256_sad_epu8(integers, _mm256_setzero_si256());  // sum(x[0] - 0, x[1] - 0, ...) = sum (x[...])
        sum = _mm256_add_epi64(sum, hsum);
    }
    // sum holds 4x 64-bit accumulators.  Horizontal sum that:
    // (this is probably more efficient than storing to memory and looping, but just barely for a vector of only 4 elements)
    __m128i hi = _mm256_extract_si128(sum, 1);
    __m128i lo = _mm256_castsi256_si128(sum);
    __m128i t1 = _mm_add_epi64(lo, hi);
    __m128i t2 = _mm_unpackhi_epi64(t1,t1);  // copy high 64 bit element to low
    __m128i t3 = _mm_add_epi64(t1, t2);
    uint64_t scalar_sum = _mm_cvtsi128_si32(t3);
    // Then a cleanup loop to handle the last partial-vector of string data.
    // or do it with an unaligned vector and some masking...
    return scalar_sum;
}

您的某些示例具有多数数字的数字,但仍然只有整数。您可以通过使用矢量比较来查找逗号的位置,以4组的组为4组的矢量并将该位图用作整数索引中的整数索引中的shuffle-control矢量。

这真的很复杂,但是请参阅 @Stgatilov关于使用此技术将虚拟Quad ipv4地址字符串解析为32位整数的答案。由于pshufb(_mm_shuffle_epi8(在两个单独的车道中运行,因此您最好只使用128位向量。

您想在循环中执行此操作,然后一次通过4个整数移动。给定比较屏蔽结果作为整数,您可以通过剥离前4套套件,然后使用位扫描指令/固有的内容来找到第五逗号的位置。可以使用BMI1 _blsr_u32 4次完成前4套套件。(使用单个指令进行dst = (a - 1) & a(。

或者无论如何您都需要一个用于混战媒介的LUT,因此您可以使用LUT条目中的某些不保守钻头来容纳字节计数。

最新更新