Gatherv 2D numpy array mpi4py



我正在通过mpi4py学习并行计算。因为我处理的是一个大数据集,所以我需要在主进程中预先分配内存,以免出现内存问题。这就是我使用ScattervGatherv方法的原因。建议的代码只有分配内存的作用域,而不做任何特定的操作

import numpy as np
from mpi4py import MPI

comm   = MPI.COMM_WORLD
rank   = comm.Get_rank()
nprocs = comm.Get_size()

if rank == 0:
    
    sendbuf = np.random.rand(4,3)
    r, c    = sendbuf.shape
    ave, res = divmod(c, nprocs)
    count = [ave + 1 if p < res else ave for p in range(nprocs)]
    count = np.array(count)
    
    print("count is ", count)
    # displacement: the starting index of each sub-task
    displ = [sum(count[:p]) for p in range(nprocs)]
    displ = np.array(displ)
else:
    sendbuf = None
    # initialize count on worker processes
    count = np.zeros(nprocs, dtype=np.int)
    displ = None
    
    
# broadcast count
comm.Bcast(count, root=0)
# initialize recvbuf on all processes
recvbuf = np.zeros((4,count[rank]))

comm.Scatterv([sendbuf, count, displ, MPI.DOUBLE], recvbuf, root=0) 

a, b     = recvbuf.shape
sendbuf2 = np.random.rand(a,b)
recvbuf2 = np.zeros((4,sum(count)))
comm.Gatherv(sendbuf2, [recvbuf2, count, displ, MPI.DOUBLE], root=0)

在主进程中,我首先定义了一个维数为(4,3)的随机二维数组(sendbuf)。我想做的是将这个矩阵分散到不同的进程中,方法是将它分成列(这样就保留了行数)。然后,我初始化recvbuf变量,以便接收sendbuf的信息块。然后使用Scatterv方法传递信息。我注意到只有第一行中的数据被正确传递。这其实并不重要,因为在实际应用程序中,recvbuf变量仅用于预分配内存。此时,我重新定义了recvbuf变量,然后尝试将信息发送回主节点,但是代码给出了错误。我真的不明白我在Gatherv部分做错了什么。

我试图使示例尽可能简单,这样代码就不会做任何特定的事情。我想学习的是如何正确地散射和收集二维numpy数组。

你到底有什么"错误" ?

下面是使用2D数组的gatherv的工作版本。请记住,大小是随着2D数组的大小而缩放的,因为NumPy在内存中具有数组行为主

from mpi4py import MPI
import numpy as np
comm_world = MPI.COMM_WORLD
my_rank = comm_world.Get_rank()
num_proc = comm_world.Get_size()
# Parameters for this script
rowlength = 2
sizes = 2*np.ones((num_proc), dtype=np.int32)
sizes[-1] = 1
# Construct some data
data = [np.array((), dtype=np.double) for _ in range(num_proc)]
data[my_rank] = np.array(my_rank+np.random.rand(sizes[my_rank], rowlength), np.double)
# Compute sizes and offsets for Allgatherv
sizes_memory = rowlength*sizes
offsets = np.zeros(num_proc)
offsets[1:] = np.cumsum(sizes_memory)[:-1]
if my_rank == 0:
   print(f"Total size {np.sum(sizes)}")
   print(f"Sizes: {sizes}")
   print(f"Sizes: {sizes_memory}")
   print(f"Offsets: {offsets}")
# Prepare buffer for Allgatherv
data_out = np.empty((np.sum(sizes), rowlength), dtype=np.double)
comm_world.Allgatherv(
   data[my_rank],
   recvbuf=[data_out, sizes_memory.tolist(), offsets.tolist(), MPI.DOUBLE])
if (my_rank == 0):
   print(f"Data_out has shape {data_out.shape}")
   print(data_out[:, 0])

Linux操作系统,MPI: 'mpirun (Open MPI) 4.1.2', MPI4PY: ' MPI4PY 3.1.4'

最新更新