OpenMP:循环'std::map'基准测试(动态调度)



我必须遍历std::map,并且必须在每次迭代中完成的工作具有以下属性:

  1. 每次迭代中的工作量会有所不同;
  2. 线程之间不需要任何同步。

看起来是动态调度的完美方案,不是吗?

然而,就OpenMP的循环并行化而言,非随机访问迭代器(如std::map(是臭名昭著的麻烦。对我来说,这个特定代码的性能将是至关重要的,因此在寻找最有效的解决方案时,我创建了以下基准:

#include <omp.h>
#include <iostream>
#include <map>
#include <vector>
#define COUNT 0x00006FFF
#define UNUSED(variable) (void)(variable)
using std::map;
using std::vector;
void test1(map<int, vector<int> >& m) {
  double time = omp_get_wtime();
  map<int, vector<int> >::iterator iterator = m.begin();
#pragma omp parallel
#pragma omp for schedule(dynamic, 1) nowait
  for (size_t i = 0; i < m.size(); ++i) {
    vector<int>* v;
#pragma omp critical
    v = &iterator->second;
    for (size_t j = 0; j < v->size(); ++j) {
      (*v)[j] = j;
    }
#pragma omp critical
    iterator++;
  }
  printf("Test #1: %f sn", (omp_get_wtime() - time));
}
void test2(map<int, vector<int> >& m) {
  double time = omp_get_wtime();
#pragma omp parallel
  {
    for (map<int, vector<int> >::iterator i = m.begin(); i != m.end(); ++i) {
#pragma omp single nowait
      {
        vector<int>& v = i->second;
        for (size_t j = 0; j < v.size(); ++j) {
          v[j] = j;
        }
      }
    }
  }
  printf("Test #2: %f sn", (omp_get_wtime() - time));
}
void test3(map<int, vector<int> >& m) {
  double time = omp_get_wtime();
#pragma omp parallel
  {
    int thread_count = omp_get_num_threads();
    int thread_num = omp_get_thread_num();
    size_t chunk_size = m.size() / thread_count;
    map<int, vector<int> >::iterator begin = m.begin();
    std::advance(begin, thread_num * chunk_size);
    map<int, vector<int> >::iterator end = begin;
    if (thread_num == thread_count - 1)
      end = m.end();
    else
      std::advance(end, chunk_size);
    for (map<int, vector<int> >::iterator i = begin; i != end; ++i) {
      vector<int>& v = i->second;
      for (size_t j = 0; j < v.size(); ++j) {
        v[j] = j;
      }
    }
  }
  printf("Test #3: %f sn", (omp_get_wtime() - time));
}
int main(int argc, char** argv) {
  UNUSED(argc);
  UNUSED(argv);
  map<int, vector<int> > m;
  for (int i = 0; i < COUNT; ++i) {
    m[i] = vector<int>(i);
  }
  test1(m);
  test2(m);
  test3(m);
}

我可以想出 3 种可能的变体来模仿我的任务。代码非常简单,不言自明,请看一下。我已经运行了几次测试,这是我的结果:

Test #1: 0.169000 s
Test #2: 0.203000 s
Test #3: 0.194000 s
Test #1: 0.167000 s
Test #2: 0.203000 s
Test #3: 0.191000 s
Test #1: 0.182000 s
Test #2: 0.202000 s
Test #3: 0.197000 s
Test #1: 0.167000 s
Test #2: 0.187000 s
Test #3: 0.211000 s
Test #1: 0.168000 s
Test #2: 0.195000 s
Test #3: 0.192000 s
Test #1: 0.166000 s
Test #2: 0.197000 s
Test #3: 0.199000 s
Test #1: 0.184000 s
Test #2: 0.198000 s
Test #3: 0.199000 s
Test #1: 0.167000 s
Test #2: 0.202000 s
Test #3: 0.207000 s

我发布这个问题是因为我发现这些结果很奇特,绝对出乎意料:

    预期测试 #
  1. 2 是最快的,因为它不像测试 #1 那样使用关键部分;
  2. 预期测试 #3 是最慢的,因为它并没有真正利用动态调度,而是依赖于作业的静态分布(手动完成(;
  3. 可能从未预料到测试 #2 大致相当于测试 #3,有时甚至更糟。

问题是:

  1. 我错过了什么吗?
  2. 你能解释一下测试结果吗?
  3. 您对这里的并行化有更好的了解吗?
  1. 您对这里的并行化有更好的了解吗?

您可以尝试模仿 OpenMP 循环的schedule(static,1),即线程不处理一大块连续迭代,而是以thread_count步的速度处理迭代。这是代码:

void test4(map<int, vector<int> >& m) {
  double time = omp_get_wtime();
#pragma omp parallel
  {
    int thread_count = omp_get_num_threads();
    int thread_num = omp_get_thread_num();
    size_t map_size = m.size();
    map<int, vector<int> >::iterator it = m.begin();
    std::advance(it, thread_num);
    for (int i = thread_num; i < map_size; i+=thread_count) {
      vector<int>& v = it->second;
      for (size_t j = 0; j < v.size(); ++j) {
        v[j] = j;
      }
      if( i+thread_count < map_size ) std::advance(it, thread_count);
    }
  }
  printf("Test #4: %f sn", (omp_get_wtime() - time));
}

schedule(static,1)提供了比schedule(static)更好的负载平衡,以防工作量在迭代空间中增加或减少。测试工作负载就是这种情况。如果每次迭代的工作量是随机的,这两种策略平均应该提供相同的平衡。

另一种变体是在原子计数器的帮助下模仿schedule(dynamic)。代码:

void test5(map<int, vector<int> >& m) {
  double time = omp_get_wtime();
  int count = 0;
#pragma omp parallel shared(count)
  {
    int i;
    int i_old = 0;
    size_t map_size = m.size();
    map<int, vector<int> >::iterator it = m.begin();
#pragma omp atomic capture
    i = count++;
    while (i < map_size) {
      std::advance(it, i-i_old);
      vector<int>& v = it->second;
      for (size_t j = 0; j < v.size(); ++j) {
        v[j] = j;
      }
      i_old = i;
#pragma omp atomic capture
      i = count++;
    }
  }
  printf("Test #5: %f sn", (omp_get_wtime() - time));
}

在循环中,线程决定它应该在映射上推进其本地迭代器的程度。线程首先原子地递增计数器并获取其先前的值,从而获得迭代索引,然后按新索引与前一个索引之间的差异推进迭代器。循环重复,直到计数器增加到地图大小以上。

最新更新