来自多个线程的MPI RMA



在我的应用程序中,我正在实现我称之为";在稀疏向量上减少";通过TBB流图与MPI单边通信(RMA(进行比较。算法的核心部分如下所示:

auto &reduce = m_g_R.add<function_node<ReductionJob, ReductionJob>>(
serial,
[=, &reduced_bi](ReductionJob rj) noexcept
{
const auto r = std::get<0>(rj);
auto *buffer = std::get<1>(rj)->data.data();
auto &mask = std::get<1>(rj)->mask;
if (m_R_comms[r] != MPI_COMM_NULL)
{
const size_t n = reduced_bi.dim(r);
MPI_Win win;
MPI_Win_create(
buffer,
r == mr ? n * sizeof(T) : 0,
sizeof(T),
MPI_INFO_NULL,
m_R_comms[r],
&win
);
if (n > 0 && r != mr)
{
MPI_Win_lock(MPI_LOCK_SHARED, 0, 0, win);
size_t i = 0;
do
{
while (i < n && !mask[i]) ++i;
size_t base = i;
while (i < n && mask[i]) ++i;
if (i > base) MPI_Accumulate(
buffer + base, i - base, MpiType<T>::type,
0,
base, i - base, MpiType<T>::type,
MPI_SUM,
win
);
}
while (i < n);
MPI_Win_unlock(0, win);
}
MPI_Win_free(&win);
}
return rj;
}
);

这是针对参与计算的每个秩r执行的,其中reduced_bi.dim(r)指定每个秩拥有多少元素。mr是当前的秩,并且通信程序是以这样的方式创建的,即目标进程是每个通信程序的根进程。bufferT = double的阵列(通常(,而mask是标识哪些元素是非零的std::vector<bool>。循环的组合将通信拆分为非零元素的块。

这通常工作得很好,结果是正确的,与我以前使用MPI_Reduce的实现相同。然而,似乎至关重要的是,该节点的并发级别设置为serial,这表明最多有一个并行TBB任务(因此最多只有一个线程(执行该代码。

我想将其设置为unlimited以提高性能,事实上,这在我的笔记本电脑上运行MPICH 3.4.1。然而,在我真正想运行计算的集群上,使用OpenMPI 4.1.1,它运行了一段时间,然后出现segfault和涉及一堆UCX函数的回溯。

我现在想知道,是不允许多个线程并行调用这样的RMA操作(在我的笔记本电脑上,它只是意外工作(,还是我在集群上遇到了错误/限制?从文档中,我没有直接看到我想做的事情不受支持。

当然,MPI是用MPI_THREAD_MULTIPLE初始化的,我再次重申,只有当我更改serial->unlimited启用并发执行我在集群上遇到了问题。


在回复Victor Eijkhout的以下评论时,这里有一个完整的示例程序来重现这个问题。这在我的笔记本电脑上运行良好(专门用mpirun -n 16测试(,但当我用16个列(分布在4个集群节点上(运行它时,它在集群上崩溃了。

#include <iostream>
#include <vector>
#include <thread>
#include <mpi.h>
int main(void)
{
int requested = MPI_THREAD_MULTIPLE, provided;
MPI_Init_thread(nullptr, nullptr, requested, &provided);
if (provided != requested)
{
std::cerr << "Failed to initialize MPI with full thread support!"
<< std::endl;
exit(1);
}
int mr, nr;
MPI_Comm_rank(MPI_COMM_WORLD, &mr);
MPI_Comm_size(MPI_COMM_WORLD, &nr);
const size_t dim = 1024;
const size_t repeat = 100;
std::vector<double> send(dim, static_cast<double>(mr) + 1.0);
std::vector<double> recv(dim, 0.0);
MPI_Win win;
MPI_Win_create(
recv.data(),
recv.size() * sizeof(double),
sizeof(double),
MPI_INFO_NULL,
MPI_COMM_WORLD,
&win
);
std::vector<std::thread> threads;
for (size_t i = 0; i < repeat; ++i)
{
threads.clear();
threads.reserve(nr);
for (int r = 0; r < nr; ++r) if (r != mr)
{
threads.emplace_back([r, &send, &win]
{
MPI_Win_lock(MPI_LOCK_SHARED, r, 0, win);
for (size_t i = 0; i < dim; ++i) MPI_Accumulate(
send.data() + i, 1, MPI_DOUBLE,
r,
i, 1, MPI_DOUBLE,
MPI_SUM,
win
);
MPI_Win_unlock(r, win);
});
}
for (auto &t : threads) t.join();
MPI_Barrier(MPI_COMM_WORLD);
if (mr == 0) std::cout << recv.front() << std::endl;
}
MPI_Win_free(&win);
MPI_Finalize();
}

注意:我有意在这里使用纯线程,以避免不必要的依赖关系。应与-lpthread链接。

我在集群上得到的具体错误是,使用OpenMPI 4.1.1:

*** An error occurred in MPI_Accumulate
*** reported by process [1829189442,11]
*** on win ucx window 3
*** MPI_ERR_RMA_SYNC: error executing rma sync
*** MPI_ERRORS_ARE_FATAL (processes in this win will now abort,
***    and potentially your MPI job)

ompi_info:中可能的相关部件

Open MPI: 4.1.1
Open MPI repo revision: v4.1.1
Open MPI release date: Apr 24, 2021
Thread support: posix (MPI_THREAD_MULTIPLE: yes, OPAL support: yes, OMPI progress: no, Event lib: yes)

它是用UCX/1.10.1编译的。

C++中的样式是将*&放入类型,而不是标识符。这是在斯特劳斯特鲁普的第一本书的开头特别指出的,是与C风格的有意区别。


创建--锁定--解锁--释放

⧺R.1使用资源句柄和RAII(资源获取即初始化(自动管理资源

使用包装器类,无论是为此目的编写的、为此C API设计的、通用资源管理器模板,还是带有自定义委托器的unique_ptr,而不是必须匹配才能正确行为的显式调用。

RAII/RFID是C++的基本优势之一,使用它将大大减少代码的缺陷,并提高代码的可维护性。


使用析构函数语法。

const auto r = std::get<0>(rj);
auto *buffer = std::get<1>(rj)->data.data();
auto &mask = std::get<1>(rj)->mask;

您可以立即命名组件,而不是引用get<0>get<1>

const auto& [r, fred] = rj;
auto* buffer = fred->data.data();
auto& mask = fred->mask;

最新更新