为什么我的多处理代码在大型数据集上停止工作



我试图计算特征的平方矩阵(Information_Gains_Matrix(和相应的平方权重矩阵(Weights_Matrix(的莫兰指数。对于Information_Gains_Matrix中的每个特征,我想在Weights_Matrix固定的情况下计算莫兰指数。

因此,我尝试使用multiprocessingpool.map来处理Information_Gains_Matrix的每个特性。我可以让代码在小型测试数据集上以各种方式实现这一点。然而,当我使用实际的大数据集时,代码会运行,但随后CPU使用率下降到0%,进程挂起,并且没有发布任何内容。

我尝试过使用全局变量和共享变量以防出现内存问题,也尝试过使用不同的队列方法以防出现这种问题,但都没有成功。以下代码是其中一个示例,适用于小数据集,但不适用于大数据集。

import multiprocessing
from multiprocessing import RawArray, Pool, Lock
from functools import partial 
import numpy as np
## Set up initial fake data
Information_Gains_Matrix = np.random.uniform(0,1,(22000,22000))
Weights_Matrix = np.random.uniform(0,1,(22000,22000))
## Function I want to parallelise.  
def Feature_Moran_Index(Chunks,Wij,N):   
Moran_Index_Scores = np.zeros(Chunks.shape[0])
for i in np.arange(Chunks.shape[0]):
print(Chunks[i]) # Print ind to show it's running
Feature = Information_Gains_Matrix[Chunks[i],:]    
X_bar = np.mean(Feature)
if X_bar != 0:
Deviance = Feature - X_bar
Outer_Deviance = np.outer(Deviance,Deviance)
Deviance2 = Deviance * Deviance
Denom = np.sum(Deviance2)
Moran_Index_Scores[i] = (N/Wij) * (np.sum((W * np.ndarray.flatten(Outer_Deviance)))/Denom)
return Moran_Index_Scores
## Set up chunks inds for each core.
Use_Cores = (multiprocessing.cpu_count()-2)
Chunk_Size = np.ceil(Information_Gains_Matrix.shape[0] / Use_Cores)
Range = np.arange(Information_Gains_Matrix.shape[0]).astype("i")
Chunk_Range = np.arange(Chunk_Size).astype("i")
Chunks = []
for i in np.arange(Use_Cores-1):
Chunks.append(Range[Chunk_Range])
Range = np.delete(Range,Chunk_Range)
Chunks.append(Range)
if __name__ == '__main__':
W = RawArray('d', Information_Gains_Matrix.shape[0] * Information_Gains_Matrix.shape[1])
W_np = np.frombuffer(W, dtype=np.float64).reshape((Information_Gains_Matrix.shape[0], Information_Gains_Matrix.shape[1]))
np.copyto(W_np, Weights_Matrix)
N = Information_Gains_Matrix.shape[0]
Wij = np.sum(Weights_Matrix)  
with Pool(processes=Use_Cores) as pool:
Results = pool.map(partial(Feature_Moran_Index, Wij=Wij,N=N), Chunks)
Moran_Index_Score = np.concatenate(Results)

我对这种方法不忠诚,如果有人能以任何方式帮助我在各个特征之间并行计算莫兰指数,我将非常感激,因为我似乎无法使其发挥作用。

Feature_Moran_Index中,Deviance的形状为(22000,),而Outer_Deviance的形状为(22000, 22000),并使用3.8GB的RAM。

数量

np.sum(W * np.ndarray.flatten(Outer_Deviance))

等于

np.sum(W_np * Outer_Deviance)

等于

Deviance @ W_np @ Deviance

将第一个表达式替换为最后一个表达式,并删除Outer_Deviance的定义后,您的程序将以大约11GB的内存使用率运行完成。

最新更新