我正在通过mpi4py学习并行计算。因为我处理的是一个大数据集,所以我需要在主进程中预先分配内存,以免出现内存问题。这就是我使用Scatterv
和Gatherv
方法的原因。建议的代码只有分配内存的作用域,而不做任何特定的操作
import numpy as np
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
nprocs = comm.Get_size()
if rank == 0:
sendbuf = np.random.rand(4,3)
r, c = sendbuf.shape
ave, res = divmod(c, nprocs)
count = [ave + 1 if p < res else ave for p in range(nprocs)]
count = np.array(count)
print("count is ", count)
# displacement: the starting index of each sub-task
displ = [sum(count[:p]) for p in range(nprocs)]
displ = np.array(displ)
else:
sendbuf = None
# initialize count on worker processes
count = np.zeros(nprocs, dtype=np.int)
displ = None
# broadcast count
comm.Bcast(count, root=0)
# initialize recvbuf on all processes
recvbuf = np.zeros((4,count[rank]))
comm.Scatterv([sendbuf, count, displ, MPI.DOUBLE], recvbuf, root=0)
a, b = recvbuf.shape
sendbuf2 = np.random.rand(a,b)
recvbuf2 = np.zeros((4,sum(count)))
comm.Gatherv(sendbuf2, [recvbuf2, count, displ, MPI.DOUBLE], root=0)
在主进程中,我首先定义了一个维数为(4,3)的随机二维数组(sendbuf
)。我想做的是将这个矩阵分散到不同的进程中,方法是将它分成列(这样就保留了行数)。然后,我初始化recvbuf
变量,以便接收sendbuf
的信息块。然后使用Scatterv
方法传递信息。我注意到只有第一行中的数据被正确传递。这其实并不重要,因为在实际应用程序中,recvbuf
变量仅用于预分配内存。此时,我重新定义了recvbuf
变量,然后尝试将信息发送回主节点,但是代码给出了错误。我真的不明白我在Gatherv
部分做错了什么。
我试图使示例尽可能简单,这样代码就不会做任何特定的事情。我想学习的是如何正确地散射和收集二维numpy数组。
你到底有什么"错误" ?
下面是使用2D数组的gatherv的工作版本。请记住,大小是随着2D数组的大小而缩放的,因为NumPy在内存中具有数组行为主
from mpi4py import MPI
import numpy as np
comm_world = MPI.COMM_WORLD
my_rank = comm_world.Get_rank()
num_proc = comm_world.Get_size()
# Parameters for this script
rowlength = 2
sizes = 2*np.ones((num_proc), dtype=np.int32)
sizes[-1] = 1
# Construct some data
data = [np.array((), dtype=np.double) for _ in range(num_proc)]
data[my_rank] = np.array(my_rank+np.random.rand(sizes[my_rank], rowlength), np.double)
# Compute sizes and offsets for Allgatherv
sizes_memory = rowlength*sizes
offsets = np.zeros(num_proc)
offsets[1:] = np.cumsum(sizes_memory)[:-1]
if my_rank == 0:
print(f"Total size {np.sum(sizes)}")
print(f"Sizes: {sizes}")
print(f"Sizes: {sizes_memory}")
print(f"Offsets: {offsets}")
# Prepare buffer for Allgatherv
data_out = np.empty((np.sum(sizes), rowlength), dtype=np.double)
comm_world.Allgatherv(
data[my_rank],
recvbuf=[data_out, sizes_memory.tolist(), offsets.tolist(), MPI.DOUBLE])
if (my_rank == 0):
print(f"Data_out has shape {data_out.shape}")
print(data_out[:, 0])
Linux操作系统,MPI: 'mpirun (Open MPI) 4.1.2', MPI4PY: ' MPI4PY 3.1.4'