如何使用python mpi4py将所有可能的列对分散/发送到子进程,并找到列之间的一致性?并行计算



我有一个大的矩阵/2D阵列,对于它的每一个可能的列对,我都需要通过python中的并行计算来找到一致性(例如mpi4py(。连贯性[一个函数]是在不同的子进程中计算的,子进程应该将连贯性值发送给父进程,父进程将一致性值收集为列表。为此,我创建了一个小矩阵和所有可能的列对列表,如下所示:

import numpy as np
from scipy import signal
from itertools import combinations
from mpi4py import MPI

comm = MPI.COMM_WORLD
nproc = comm.Get_size()
rank = comm.Get_rank()
data=np.arange(20).reshape(5, 4)
#List of all possible column pairs
data_col = list(combinations(np.transpose(data), 2)) #list
# Function creation
def myFunc(X,Y):
..................
..................
return Real_coh
if rank==0:
Data= comm.scatter(data_col,root=0) #col_pair

有人能建议我如何继续吗。欢迎您提出任何问题/澄清。期待您的亲切帮助。感谢

检查以下脚本[使用comm.Barrier进行同步通信]。在脚本中,我将文件作为h5py数据集的一块进行编写和读取,这是一种高效的内存。

import numpy as np
from scipy import signal
from mpi4py import MPI
import h5py as t
chunk_len = 5000 # No. of rows of a matrix
num_c = 34    # No. of column of the matrix
# Actual Dataset
data_mat = np.random.random((10000, num_c))
shape = (chunk_len, data_mat.shape[1])
chunk_size = (chunk_len, 1)
no_of_chunks = data_mat.shape[1]
with t.File('file_name.h5', 'w') as hf:
hf.create_dataset("chunked_arr",  data=data_mat, chunks=chunk_size, compression='lzf')
del data_mat
def myFunc(dset_X, dset_Y):
..............
............
return Real_coh
res = np.zeros((num_c, num_c))
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
for i in range(num_c):
with t.File('file_name.h5', 'r', libver='latest') as hf:
dset_X = hf['chunked_arr'][:, i]  # Chunk data reading
if i % size == rank:
for j in range(num_c):
with t.File('file_name.h5', 'r', libver='latest') as hf:
dset_Y = hf['chunked_arr'][:, j] # Chunk data reading
res[i][j] = spac(dset_X, dset_Y)
comm.Barrier()
print('Shape of final result :', res.shape )