我正在使用MPI解决负载平衡问题:主进程将任务发送到从进程,并在它们计算和发回作业时收集结果。由于我想尽可能地提高性能,因此我使用非阻塞通信:Master 发送多个任务,然后等到一个进程发回其响应,以便 master 可以向其发送额外的工作,依此类推。
我使用 MPI_Waitany((,因为我事先不知道哪个从进程首先响应,然后我从状态中获取发送者,我可以将新作业发送到它。
我的问题是,有时我得到的发件人是错误的(排名不在MPI_COMM_WORLD中(,程序崩溃;其他时候它工作正常。
这是代码。谢谢!
//master
if (rank == 0) {
int N_chunks = 10;
MPI_Request request[N_chunks];
MPI_Status status[N_chunks];
int N_computed = 0;
int dest,index_completed;
//initialize array of my data structure
vec send[N_chunks];
vec recv[N_chunks];
//send one job to each process in communicator
for(int i=1;i<size;i++){
MPI_Send( &send[N_computed], 1, mpi_vec_type, i, tag, MPI_COMM_WORLD);
MPI_Irecv(&recv[N_computed], 1, mpi_vec_type, i, tag, MPI_COMM_WORLD,
&request[N_computed]);
N_computed++;
}
// loop
while (N_computed < N_chunks){
//get processed messages
MPI_Waitany(N_computed,request,&index_completed,status);
//get sender ID dest
dest = status[index_completed].MPI_SOURCE;
//send a new job to that process
MPI_Send( &send[N_computed], 1, mpi_vec_type, dest, tag, MPI_COMM_WORLD);
MPI_Irecv(&recv[N_computed], 1, mpi_vec_type, dest, tag, MPI_COMM_WORLD,
&request[N_computed]);
N_computed++;
}
MPI_Waitall(N_computed,request,status);
//close all process
printf("End mastern");
}
您没有正确使用MPI_Waitany()
。
它应该是
MPI_Status status;
MPI_Waitany(N_computed,request,&index_completed,&status);
dest = status.MPI_SOURCE;
注意:
- 您需要一个额外的循环来
MPI_Wait()
最后size - 1
请求 - 您可以修改算法并使用
MPI_Request request[size-1];
从而节省一些内存
我忘了在等待所有待处理请求的初始帖子中添加一行:顺便说一句,如果我每次执行 Waitany 时初始化一个新状态,主进程就会崩溃;我需要跟踪哪些进程仍处于挂起状态,以便等待适当的次数......顺便说一下,谢谢
编辑:现在即使我发现它不是很聪明,它也可以工作;是否可以在开始时初始化一个MPI_Status数组,而不是每次在等待之前都这样做?
//master
if (rank == 0) {
int N_chunks = 10;
MPI_Request request[size-1];
int N_computed = 0;
int dest;
int index_completed;
//initialize array of vec
vec send[N_chunks];
vec recv[N_chunks];
//initial case
for(int i=1;i<size;i++){
MPI_Send(&send[N_computed], 1, mpi_vec_type, i, tag, MPI_COMM_WORLD);
MPI_Irecv(&recv[N_computed], 1, mpi_vec_type, i, tag, MPI_COMM_WORLD, &request[N_computed]);
N_computed++;
}
// loop
while (N_computed < N_chunks){
MPI_Status status;
//get processed messages
MPI_Waitany(N_computed,request,&index_completed,&status);
//get sender ID dest
dest = status.MPI_SOURCE;
MPI_Send(&send[N_computed], 1, mpi_vec_type, dest, tag, MPI_COMM_WORLD);
MPI_Irecv(&recv[N_computed], 1, mpi_vec_type, dest, tag, MPI_COMM_WORLD, &request[N_computed]);
N_computed++;
}
//wait other process to send back their load
for(int i=0;i<size-1;i++){
MPI_Status status;
MPI_Waitany(N_computed, request, &index_completed,&status);
}
//end
printf("Ms finishn");
}