将执行循环拆分为多个线程 (1-N-1-N-1..)



考虑这种情况:

for (...)
{
const size_t count = ...
for (size_t i = 0; i < count; ++i)
{
calculate(i); // thread-safe function
}
}

使用 C++17 和/或提升最大限度地提高性能的最优雅的解决方案是什么?

循环的"创建 + 连接"线程毫无意义,因为开销巨大(在我的情况下,这完全等于可能的收益)。

所以我只需要创建 N 个线程一次,并使它们与主线程保持同步(使用:互斥、shared_mutex、condition_variable、原子等)。对于这种常见和清晰的情况(为了使一切真正安全和快速),这似乎是相当艰巨的任务。白天坚持下去,我有一种"发明自行车"的感觉......

  • 更新 1:calculate(x) 和 calculate(y) 可以(并且应该)运行 平行
  • 更新 2:std::atomic::fetch_add(或 smth.)更可取 比队列(或 SMTH)。
  • 更新3:极端计算(即数百万个"外部"调用和数百个"内部"调用)
  • 更新 4:calculate() 更改内部对象的数据而不返回值

中间溶液

出于某种原因,"异步 + 等待"比"创建 + 加入"线程快得多。所以这两个例子使速度提高了100%:

例 1

for (...)
{
const size_t count = ...
future<void> execution[cpu_cores];
for (size_t x = 0; x < cpu_cores; ++x)
{
execution[x] = async(launch::async, ref(*this), x, count);
}
for (size_t x = 0; x < cpu_cores; ++x)
{
execution[x].wait();
}
}
void operator()(const size_t x, const size_t count)
{
for (size_t i = x; i < count; i += cpu_cores)
{
calculate(i);
}
}

例 2

for (...)
{
index = 0;
const size_t count = ...
future<void> execution[cpu_cores];
for (size_t x = 0; x < cpu_cores; ++x)
{
execution[x] = async(launch::async, ref(*this), count);
}
for (size_t x = 0; x < cpu_cores; ++x)
{
execution[x].wait();
}
}
atomic<size_t> index;
void operator()(const size_t count)
{
for (size_t i = index.fetch_add(1); i < count; i = index.fetch_add(1))
{
calculate(i);
}
}

是否可以通过仅创建一次线程,然后以较小的开销同步它们来使其更快?

最终解决方案

与标准::异步相比,速度提高了+20%!

for (size_t i = 0; i < _countof(index); ++i) { index[i] = i; }
for_each_n(par_unseq, index, count, [&](const size_t i) { calculate(i); });

是否可以避免冗余阵列"索引"?

是的:

for_each_n(par_unseq, counting_iterator<size_t>(0), count,
[&](const size_t i)
{
calculate(i);
});

过去,您会使用 OpenMP、GNU Parallel、Intel TBB。

如果你有 c++17²,我建议将执行策略与标准算法一起使用。

这真的比你自己做事要好,尽管它

  • 需要一些深思熟虑来选择适合标准算法的类型
  • 如果您知道引擎盖下会发生什么,仍然会有所帮助

这是一个简单的例子,事不宜迟:

实时编译器资源管理器

#include <thread>
#include <algorithm>
#include <random>
#include <execution>
#include <iostream>
using namespace std::chrono_literals;
static size_t s_random_seed = std::random_device{}();
static auto generate_param() {
static std::mt19937 prng {s_random_seed};
static std::uniform_int_distribution<> dist;
return dist(prng);
}
struct Task {
Task(int p = generate_param()) : param(p), output(0) {}
int param;
int output;
struct ByParam  { bool operator()(Task const& a, Task const& b) const { return a.param < b.param; } };
struct ByOutput { bool operator()(Task const& a, Task const& b) const { return a.output < b.output; } };
};
static void calculate(Task& task) {
//std::this_thread::sleep_for(1us);
task.output = task.param ^ 0xf0f0f0f0;
}
int main(int argc, char** argv) {
if (argc>1) {
s_random_seed = std::stoull(argv[1]);
}
std::vector<Task> jobs;
auto now = std::chrono::high_resolution_clock::now;
auto start = now();
std::generate_n(
std::execution::par_unseq,
back_inserter(jobs),
1ull << 28, // reduce for small RAM!
generate_param);
auto laptime = [&](auto caption) {
std::cout << caption << " in " << (now() - start)/1.0s << "s" << std::endl;
start = now();
};
laptime("generate randum input");
std::sort(
std::execution::par_unseq,
begin(jobs), end(jobs),
Task::ByParam{});
laptime("sort by param");
std::for_each(
std::execution::par_unseq,
begin(jobs), end(jobs),
calculate);
laptime("calculate");
std::sort(
std::execution::par_unseq,
begin(jobs), end(jobs),
Task::ByOutput{});
laptime("sort by output");
auto const checksum = std::transform_reduce(
std::execution::par_unseq,
begin(jobs), end(jobs),
0, std::bit_xor<>{},
std::mem_fn(&Task::output)
);
laptime("reduce");
std::cout << "Checksum: " << checksum << "n";
}

当使用种子42运行时,打印:

generate randum input in 10.8819s
sort by param in 8.29467s
calculate in 0.22513s
sort by output in 5.64708s
reduce in 0.108768s
Checksum: 683872090

除第一个(随机生成)步骤外,所有内核的 CPU 利用率均为 100%。


¹(我想我在这个网站上演示了所有这些答案)。

² 参见 C++17 并行算法是否已经实现?

最新更新