我有这段代码,我试图与OpenMP并行。它使用装载了点(X, Y和Z坐标)的向量的向量,然后它旋转和移动这些点,并在网格中找到最低的点。代码:
double min_x = *min_element(points[0].begin(), points[0].end());
double min_y = *min_element(points[1].begin(), points[1].end());
double min_z = *min_element(points[2].begin(), points[2].end());
// reduction of coordinates
vector<vector<double>> reduced_points(3);
for (int i = 0; i < 3; i++) {
reduced_points[i].resize(points[i].size());
for (unsigned long int j = 0; j < points[i].size(); j++) {
reduced_points[i][j] = points[i][j] - (i == 0 ? min_x : i == 1 ? min_y : min_z);
}
}
// conversion of rotation angles to radians
#pragma omp parallel for collapse(3)
for (int m = 0; m < num_a; m++) {
for (int n = 0; n < num_b; n++) {
for (int o = 0; o < num_g; o++) {
double alpha = (alpha_gon[m] / 200) * numbers::pi;
double beta = (beta_gon[n] / 200) * numbers::pi;
double gamma = (gamma_gon[o] / 200) * numbers::pi;
// define rotation matrices for x, y, and z axes
vector<vector<double>> x_rot_matrix = {{1, 0, 0}, {0, cos(alpha), sin(alpha)}, {0, -sin(alpha), cos(alpha)}};
vector<vector<double>> y_rot_matrix = {{cos(beta), 0, -sin(beta)}, {0, 1, 0}, {sin(beta), 0, cos(beta)}};
vector<vector<double>> z_rot_matrix = {{cos(gamma), sin(gamma), 0}, {-sin(gamma), cos(gamma), 0}, {0, 0, 1}};
// apply rotations to points in reduced_points and store in rotated_points
vector<vector<double>> rotated_points(3);
for (int i = 0; i < 3; i++) {
rotated_points[i].resize(reduced_points[i].size());
for (unsigned long int j = 0; j < reduced_points[i].size(); j++) {
double x = reduced_points[0][j];
double y = reduced_points[1][j];
double z = reduced_points[2][j];
rotated_points[i][j] = x_rot_matrix[i][0] * x + x_rot_matrix[i][1] * y + x_rot_matrix[i][2] * z;
rotated_points[i][j] += y_rot_matrix[i][0] * x + y_rot_matrix[i][1] * y + y_rot_matrix[i][2] * z;
rotated_points[i][j] += z_rot_matrix[i][0] * x + z_rot_matrix[i][1] * y + z_rot_matrix[i][2] * z;
}
}
// shifting to origin
double to_origin_x = *min_element(rotated_points[0].begin(), rotated_points[0].end());
double to_origin_y = *min_element(rotated_points[1].begin(), rotated_points[1].end());
double to_origin_z = *min_element(rotated_points[2].begin(), rotated_points[2].end());
vector<vector<double>> to_origin_points(3);
for (int i = 0; i < 3; i++) {
to_origin_points[i].resize(rotated_points[i].size());
for (unsigned long int j = 0; j < rotated_points[i].size(); j++) {
to_origin_points[i][j] = rotated_points[i][j] - (i == 0 ? to_origin_x : i == 1 ? to_origin_y : to_origin_z);
}
}
//adding shift
for (int q = 0; q < number_of_shifts; q++) {
for (int r = 0; r < number_of_shifts; r++) {
vector<vector<double>> shifted_points(3);
for (int i = 0; i < 3; i++) {
shifted_points[i].resize(to_origin_points[i].size());
for (unsigned long int j = 0; j < to_origin_points[i].size(); j++) {
shifted_points[i][j] = to_origin_points[i][j] + (i == 0 ? shift_x * q : i == 1 ? shift_y * r : 0);
}
}
// find maximum values in each column
double max_x = *max_element(shifted_points[0].begin(), shifted_points[0].end());
double max_y = *max_element(shifted_points[1].begin(), shifted_points[1].end());
//determine raster size
int limit_x = ceil(max_x/raster_size);
int limit_y = ceil(max_y/raster_size);
vector<vector<double>> grid(limit_x, vector<double>(limit_y));
// finding ground
int grid_x;
int grid_y;
for (unsigned long int i = 0; i < shifted_points[0].size(); i++) {
grid_x = floor(shifted_points[0][i]/raster_size);
grid_y = floor(shifted_points[1][i]/raster_size);
if (grid[grid_x][grid_y] == 0)
{
grid[grid_x][grid_y] = i;
} else if (shifted_points[2][i] < shifted_points[2][grid[grid_x][grid_y]]) {
grid[grid_x][grid_y] = i;
}
}
for (int i = 0; i < grid.size(); i++) {
for (int j = 0; j < grid[i].size(); j++) {
int index = grid[i][j];
if (index != 0) {
#pragma omp critical
{
all_lowest_points.insert(index);
count_map[index]++;
}
}
}
}
}
}
}
}
}
这个解决方案有效,但由于我是初学者,我有一种感觉,可能有一个更好的。此外,如果将'raster_size'设置为小于1的值,计算会突然变得慢得多,并且只使用一小部分CPU功率。我正在寻找任何改进我的代码。
正如Victor Eijkhout所建议的,最好的解决方案是定义一个自定义的压缩。另一种方法是手动减少,虽然效率稍低,但可能仍然比初始代码效率高得多,因为每个线程只遇到一个临界(而不是在循环中多次遇到)。
首先,您必须分割omp parallel for
并声明应用缩减的对象的本地/私有版本。
#pragma omp parallel
{
// declare here all_lowest_points_local count_map_local
// with the same characteristics as the global version
#pragma omp for collapse(3)
for (int m = 0; m < num_a; m++) {
for (int n = 0; n < num_b; n++) {
for (int o = 0; o < num_g; o++) {
在您的omp critical
区域,您更新本地版本而不是全局版本。这样,您就可以删除critical
pragma
在并行部分结束之前更新全局版本。
} // end of o loop
} // end of n loop
} // end of m loop
#pragma omp critical
{
// here you update all_lowest_points and count_map
// using all_lowest_points_local and count_map_local
}
} // end of omp parallel
-
您要并行处理的数据点数量偏低。
-
你是正确的标记两行
critical
,不幸的是,这对性能不好。由于您使用的是c++,如果定义一个将lowest_points
和count
数组合并的约简运算符,则可以将其转换为约简运算符。我会写一个类low_points
并定义operator+
。然后,所有程序都将运行得非常快,几乎完全并行。