加速if-else梯形图C++



我有一段代码要求执行速度高于其他任何代码。通过使用std::chrono中的high_resolution_clock(),我发现将()切换到if-else()梯形图占用了我70%以上的执行时间。有什么办法加快速度吗?

我在编译过程中使用gcc-O3优化。

我研究了一个类似的问题:If else梯形图优化,但我不能使用return语句,因为它会退出外循环,而我不能。

switch(RPL_OPTION) {
case 0:
for(int k = 0; k < WINDOW_SIZE; k++) {
if(ans[k] >= upper_th) {
//Increasing flag counter
flag_count++;
//Adding the filtered value to the output vector
filtered_output.push_back(ans[k]);
flag_output.push_back(1);
} else if(ans[k] < lower_th) {
//Increasing flag counter
flag_count++;
//Adding the filtered value to the output vector
filtered_output.push_back(ans[k]);
flag_output.push_back(1);
} else {
//Adding the filtered value to the output vector
filtered_output.push_back(ans[k]);
flag_output.push_back(0);
}
}
break;
case 1:
for(int k = 0; k < WINDOW_SIZE; k++) {
if(ans[k] >= upper_th) {
//Increasing flag counter
flag_count++;
//Adding the filtered value to the output vector
filtered_output.push_back(RPL_CONST);
flag_output.push_back(1);
} else if(ans[k] < lower_th) {
//Increasing flag counter
flag_count++;
//Adding the filtered value to the output vector
filtered_output.push_back(RPL_CONST);
flag_output.push_back(1);
} else {
//Adding the filtered value to the output vector
filtered_output.push_back(ans[k]);
flag_output.push_back(0);
}
}
break;
case 2:
for(int k = 0; k < WINDOW_SIZE; k++) {
if(ans[k] >= upper_th) {
//Increasing flag counter
flag_count++;
//Adding the filtered value to the output vector
filtered_output.push_back(upper_th);
flag_output.push_back(1);
} else if(ans[k] < lower_th) {
//Increasing flag counter
flag_count++;
//Adding the filtered value to the output vector
filtered_output.push_back(lower_th);
flag_output.push_back(1);
} else {
//Adding the filtered value to the output vector
filtered_output.push_back(ans[k]);
flag_output.push_back(0);
}
}
break;
case 3:
//Generating a gaussian noise distribution with 0 mean and 1 std deviation
default_random_engine generator(time(0));
normal_distribution<float> dist(0,1);
for(int k = 0; k < WINDOW_SIZE; k++) {
if(ans[k] >= upper_th) {
//Increasing flag counter
flag_count++;
//Calling a random sample from the distribution and calculating a noise value
filtered_output.push_back(dist(generator)*sigma);
flag_output.push_back(1);
continue;
} else if(ans[k] < lower_th) {
//Increasing flag counter
flag_count++;
//Calling a random sample from the distribution and calculating a noise value
filtered_output.push_back(dist(generator)*sigma);
flag_output.push_back(1);
continue;
} else {
//Adding the filtered value to the output vector
filtered_output.push_back(ans[k]);
flag_output.push_back(0);
}
}
break;
}

想到的几个优化:

  1. vector.push_back()emplace_back(),即使使用reserve(),也会对性能造成危害,因为没有编译器能够对代码进行向量化。我们可以使用纯C指针,也可以只进行预分配。

  2. 如果重复调用此代码,则在最后一种情况下生成随机引擎和分发可能会产生巨大的成本。我们可以把它从代码中删除。请注意,这也将避免使用低分辨率时间函数进行重复初始化的问题。

  3. 这可能是不必要的,但稍微重写代码可能会允许更多的编译器优化,尤其是通过将内容转换为条件移动指令和减少分支数量。

/* TODO: We have better ways of initializing generators but that is
* unrelated to its performance
* I'm lazy and turn this into a static variable. Better use a
* different pattern (like up in the stack somewhere)
* but you get the idea
*/
static default_random_engine generator(time(0));
static normal_distribution<float> dist(0,1);
std::size_t output_pos = filtered_output.size();
filtered_output.resize(output_pos + WINDOW_SIZE);
flag_output.resize(output_pos + WINDOW_SIZE);
switch(RPL_OPTION) {
case 0:
for(int k = 0; k < WINDOW_SIZE; k++) {
auto ansk = ans[k];
int flag = (ansk >= upper_th) | (ansk < lower_th);
flag_count += flag;
filtered_output[output_pos + k] = ansk;
flag_output[output_pos + k] = flag;
}
break;
case 1:
for(int k = 0; k < WINDOW_SIZE; k++) {
auto ansk = ans[k];
int flag = (ansk >= upper_th) | (ansk < lower_th);
flag_count += flag;
// written carefully to help compiler turning this into a CMOV
auto filtered = flag ? RPL_CONST : ansk;
filtered_output[output_pos + k] = filtered;
flag_output[output_pos + k] = flag;
}
break;
case 2:
for(int k = 0; k < WINDOW_SIZE; k++) {
auto ansk = ans[k];
int flag = (ansk >= upper_th) | (ansk < lower_th);
flag_count += flag;
auto filtered = ansk < lower_th ? lower_th : ansk;
filtered = ansk >= upper_th ? upper_th : filtered;
filtered_output[output_pos + k] = filtered;
flag_output[output_pos + k] = flag;
}
break;
case 3:
for(int k = 0; k < WINDOW_SIZE; k++) {
// optimized under the assumption that flag is usually 1
auto ansk = ans[k];
auto random = dist(generator) * sigma;
int flag = (ansk >= upper_th) | (ansk < lower_th);
auto filtered = flag ? random : ansk;
filtered_output[output_pos + k] = filtered;
flag_output[output_pos + k] = flag;
}
break;
}

正在分析编译器输出

我用Godbolt检查了生成的代码。案例0-2确实进行了矢量化。然而,这在很大程度上取决于良好的别名检测。因此,这需要在包含此代码的完整函数的上下文中进行分析。特殊的痛点是

  • ansfiltered_output之间的潜在别名。这是很难避免的,但我认为编译器应该能够创建代码来检查
  • 阈值+RPL_CONSTfiltered_output之间的潜在别名。当有疑问时,将输入复制到本地变量中(编译器可以证明该变量没有别名)。仅仅将它们标记为const可能是不够的
  • flag_countflag_output之间的潜在别名,具体取决于类型。同样,最好使用局部变量进行计数,然后根据需要将其复制到其输出中

对于情况3,计算随机样本的成本足够高,如果输入通常在限制范围内,我的优化可能会降低性能。这需要制定基准。我想得越久,在错误预测中损失几个时钟周期的时间可能比不使用它计算样本的时间要短得多

删除冗余代码

生成的代码是高度冗余的。我们可以将开关情况转移到循环中,但这会干扰矢量化。相反,我们可以使用模板函数模式。


class Filter
{
int WINDOW_SIZE;
float upper_th, lower_th, sigma, RPL_CONST;
std::default_random_engine generator;
std::normal_distribution<float> dist;
template<class FilterOp>
int apply(std::vector<float>& filtered_output,
std::vector<int>& flag_output,
const std::vector<float>& ans, FilterOp filter)
{
// move stuff into local variables to help with alias detection
const int WINDOW_SIZE = this->WINDOW_SIZE;
const float upper_th = this->upper_th, lower_th = this->lower_th;
const std::size_t output_pos = filtered_output.size() - WINDOW_SIZE;
int flag_count = 0;
for(int k = 0; k < WINDOW_SIZE; k++) {
auto ansk = ans[k];
int flag = (ansk >= upper_th) | (ansk < lower_th);
flag_count += flag;
filtered_output[output_pos + k] = filter(ansk, flag);
flag_output[output_pos + k] = flag;
}
return flag_count;
}
public:
int operator()(int RPL_OPTION,
std::vector<float>& filtered_output,
std::vector<int>& flag_output,
const std::vector<float>& ans)
{
std::size_t output_pos = filtered_output.size();
filtered_output.resize(output_pos + WINDOW_SIZE);
flag_output.resize(output_pos + WINDOW_SIZE);
switch(RPL_OPTION) {
case 0:
return apply(filtered_output, flag_output, ans,
[](float ansk, int flag) noexcept -> float {
return ansk;
});
case 1:
return apply(filtered_output, flag_output, ans,
[RPL_CONST=this->RPL_CONST](float ansk, int flag) noexcept -> float {
return flag ? RPL_CONST : ansk;
});
case 2:
return apply(filtered_output, flag_output, ans,
[lower_th=this->lower_th, upper_th=this->upper_th](
float ansk, int flag) noexcept -> float {
auto filtered = ansk < lower_th ? lower_th : ansk;
return ansk >= upper_th ? upper_th : filtered;
});
case 3:
return apply(filtered_output, flag_output, ans,
[this](float ansk, int flag) noexcept -> float {
return flag ? dist(generator)*sigma : ansk;
});
default: return 0;
}
}
};

我几乎98%确信if-else梯形图不是问题所在。

std::vectors(或您使用的任何容器)push_back函数具有大量的重新分配和数据复制功能,对我来说是优化的主要候选者。

请使用reserve函数预先分配所需的内存。

然后去掉所有不变的东西,比如

default_random_engine generator(time(0));
normal_distribution<float> dist(0,1);

但如果没有更多的示例代码,就很难做出判断。

探查器会给你更好的结果。计时器功能在这里帮助不大。

我有一段代码要求执行速度高于其他任何

这表明了一种不明显的方法。从信号处理的角度来看,代码模式看起来足够熟悉,因此WINDOW_SIZE可能并不平凡。在这种情况下,使用AVX2进行打包比较是有意义的。

简而言之,您将一个完整的AVX2寄存器装满输入,使用两个AVX2寄存器存储下限和上限阈值的副本,并进行两次比较。这将为您提供两个输出,其中每个值都是0或~0

因此,您的标志计数由两个寄存器决定。数旗已经很诱人了,但这被认为是一个缓慢的"过程";水平相加";。最好在另一个AVX寄存器中跟踪这一点,并在最后进行一次水平加法。

filtered_output的更新取决于具体情况,但对于1和2,也可以使用AVX。可以使用mm256_blendv_epi8在基于第三寄存器中的位的两个值之间进行选择。您可以安全地忽略那里的8,这是最小分辨率(一个字节)。如果您正在进行32位比较,则结果寄存器也将包含32位结果,因此mm256_blendv_epi8将以4*8位分辨率工作。

如果您有case 0:,当然应该只是filtered_output的直接副本,在if语句之外。

首先要注意的是向量上的push_back。该代码没有显示对reserve的调用,因此这将随着矢量的不断增长而调整其大小。这可能比循环中的任何其他东西都贵。

接下来的事情涉及";if else梯形图":

你真的分析过梯子是否有问题吗?分支只有在预测失误时才是昂贵的。也许分支预测器对你的输入很有效?假设这个switch语句被执行多次,那么帮助它的一种方法就是对输入进行排序。然后,if-else阶梯不会每次都随机跳跃,而是在切换到新案例之前以相同的方式重复多次。但只有当循环运行多次或排序成本会抵消任何改进时,这才有帮助。

如果开关重复多次,您可以将输入分为3组,一次用于梯形图中的3个选项,并在没有任何if-else梯形图的情况下处理每组。

仔细看代码,我发现if-else梯形图中的前两种情况是相同的(除了情况2)。所以你可以把这些测试结合起来;如果其它":

if ((ans[k] >= upper_th) || (ans[k] < lower_th))

现在,由于延迟求值,这将产生与以前相同的代码。但你可以做得更好:

if ((ans[k] >= upper_th) | (ans[k] < lower_th))

现在,这两个部分都得到了评估,因为不存在对|的惰性评估。除了编译器是人造的傻瓜,可能只是做懒惰的评估。在这一点上,您正在与优化器作斗争。有些编译器会将其优化为两个分支,有些则保留为一个分支。

你可以在那里使用以下技巧:

static auto fn[] = {
[&]() { code for first choice; };
[&]() { code for second choice; };
};
fn[((ans[k] >= upper_th) | (ans[k] < lower_th))]();

通过将if的条件转换为计算数组的索引,可以避免编译器优化生成2个分支。有希望地至少在下一次编译器更新之前。:)

与优化器斗争时,每次更新编译器时都必须重新检查解决方案。

对于情况2,代码中的差异只是push_back的值。如果你使用,这可以在大多数架构上变成有条件的移动,而不是分支

(ans[k] >= upper_th) ? upper_th : lower_th;

用于push_back。

首先,由于台架预测,排序ans可能是个好主意。但它主要达到的大小。其次,如果您使用的是c++20,您可以查看[LIKELY]和[UNLIKELY]关键字。如果你能选择哪一条语句是最主要的还是相反的,你就可以很容易地使用它们。

相关内容

  • 没有找到相关文章

最新更新