我正在尝试在C++中制作一个多线程的 for 循环,以便将计算划分为多个线程。然而,它包含需要按原样顺序连接在一起的数据。
因此,我们的想法是首先连接许多内核上的小位(25.000+循环),然后在最后再次连接组合数据。
std::vector<int> ids; // mappings
std::map<int, myData> combineData; // data per id
myData outputData; // combined data based on the mappings
myData threadData; // data per thread
#pragma parallel for default(none) private(data, threadData) shared(combineData)
for (int i=0; i<30000; i++)
{
threadData += combineData[ids[i]];
}
// Then here I would like to get all the seperate thread data and combine them in a similar manner
// I.e.: for each threadData: outputData += threadData
解决这个问题的有效和好方法是什么?
如何调度 openmp 循环,以便将调度均匀地分成块
例如,对于 2 个线程:[0, 1, 2, 3, 4, .., 14999] & [15000, 15001, 15002, 15003, 15004, .., 29999]
如果有更好的方法来连接数据(包括将大量 std::vector 连接在一起和一些矩阵数学),同时保留添加指针的顺序也会有所帮助。
新增信息
- 加法是关联的,但不是可交换的。
- myData 不是一种内在类型。它是一个包含多个 std::vector 的数据(以及与 Autodesk Maya API 相关的一些数据)的类。 每个
- 循环对许多点进行类似的矩阵乘法,并将这些点添加到向量中(理论上每个周期的计算时间应该大致相似)
由数据向量组成)相互添加(组合网格),尽管整个事物的顺序占顶点的索引值。顶点索引应一致且可重建。
这取决于 myData
的加法运算符的一些属性。 如果运算符既是关联(A + B) + C = A + (B + C)
又是交换A + B = B + A
那么您可以使用critical
部分,或者如果数据是普通的旧数据(例如浮点数,int,...)则使用reduction
。
但是,如果它不像你说的那样是可交换的(操作顺序很重要),但仍然是关联的,你可以用等于并行组合数据的线程数的元素数填充数组,然后按顺序串行合并它们(参见下面的代码。 使用schedule(static)将或多或少地均匀地拆分块,并根据需要增加线程数。
如果运算符既不是关联运算符也不是可交换的,那么我认为您无法并行化它(有效地 - 例如,尝试有效地并行化斐波那契级数)。
std::vector<int> ids; // mappings
std::map<int, myData> combineData; // data per id
myData outputData; // combined data based on the mappings
myData *threadData;
int nthreads;
#pragma omp parallel
{
#pragma omp single
{
nthreads = omp_get_num_threads();
threadData = new myData[nthreads];
}
myData tmp;
#pragma omp for schedule(static)
for (int i=0; i<30000; i++) {
tmp += combineData[ids[i]];
}
threadData[omp_get_thread_num()] = tmp;
}
for(int i=0; i<nthreads; i++) {
outputData += threadData[i];
}
delete[] threadData;
编辑:在这一点上,我不是 100% 确定块是否会按照#pragma omp for schedule(static)
增加线程数的顺序分配(尽管如果它们不是,我会感到惊讶)。 目前正在就此问题进行讨论。同时,如果您想 100% 确定,那么而不是
#pragma omp for schedule(static)
for (int i=0; i<30000; i++) {
tmp += combineData[ids[i]];
}
你可以做
const int nthreads = omp_get_num_threads();
const int ithread = omp_get_thread_num();
const int start = ithread*30000/nthreads;
const int finish = (ithread+1)*30000/nthreads;
for(int i = start; i<finish; i++) {
tmp += combineData[ids[i]];
}
编辑:
我找到了一种更优雅的方式来并行填充但按顺序合并
#pragma omp parallel
{
myData tmp;
#pragma omp for schedule(static) nowait
for (int i=0; i<30000; i++) {
tmp += combineData[ids[i]];
}
#pragma omp for schedule(static) ordered
for(int i=0; i<omp_get_num_threads(); i++) {
#pragma omp ordered
outputData += tmp;
}
}
这避免了为每个线程(threadData
)分配数据并在并行区域之外合并。
如果您真的想保留与串行情况下相同的顺序,那么除了串行执行之外别无他法。在这种情况下,您可以尝试并行化在 operator+=
中完成的操作。
如果操作可以随机完成,但块的减少有特定的顺序,那么可能值得看看TBB parallel_reduce
。这将需要您编写更多代码,但是如果我记得不错,您可以定义复杂的自定义缩减操作。
如果操作顺序无关紧要,那么您的代码段几乎完成了。它缺少的可能是聚合私有数据的critical
结构:
std::vector<int> ids; // mappings
std::map<int, myData> combineData; // data per id
myData outputData; // combined data based on the mappings
#pragma omp parallel
{
myData threadData; // data per thread
#pragma omp for nowait
for (int ii =0; ii < total_iterations; ii++)
{
threadData += combineData[ids[ii]];
}
#pragma omp critical
{
outputData += threadData;
}
#pragma omp barrier
// From here on you are ensured that every thread sees
// the correct value of outputData
}
在这种情况下,for 循环的计划对于语义并不重要。如果 operator+=
的重载是一个相对稳定的操作(就计算它所需的时间而言),那么您可以使用在线程之间平均分配迭代的schedule(static)
。否则,您可以求助于其他调度来平衡计算负担(例如 schedule(guided)
)。
最后,如果myData
是内部类型的 typedef,则可以避免使用关键部分并使用 reduction
子句:
#pragma omp for reduction(+:outputData)
for (int ii =0; ii < total_iterations; ii++)
{
outputData += combineData[ids[ii]];
}
在这种情况下,您无需将任何内容显式声明为私有。