在Python中,multiprocessing.pool花费的时间与for循环花费的时间大致相同



我正在尝试使用python来处理来自几个数据站的一些大型数据集。我的想法是使用multiprocessing.pool为每个CPU分配来自单个工作站的数据,因为每个工作站的数据彼此独立。

然而,与单循环相比,我的计算时间似乎并没有真正下降。

这是我的部分代码:

#function calculating the square of each data point, and taking the cumulative sum
def get_cumdd(data):
#if not isinstance(data, list):
#    data = [data]
dd = np.zeros((len(data),1))
cum_dd = np.zeros((len(data),1))
for i in range(len(data)):
dd[i] = data[i]**2
cum_dd=np.cumsum(dd)
return cum_dd
#parallelization between each station 
if __name__ == '__main__':
n_proc = np.min([mp.cpu_count(),nstation]) #nstation = 10
p = mp.Pool(processes=int(n_proc)) 
result = p.map(get_cumdd,data)
p.close()
p.join()
cum_dd = np.zeros((nstation,len(data[0])))
for i in range(nstation):
cum_dd[i] = result[i].T 

我不使用chunksize,因为cum_dd取之前所有数据^2的总和。我基本上将我的数据分成10个相等的部分,因为进程之间没有通信。我想知道我是否错过了什么。

我的数据每天每个站点有200万个点,我需要处理多年的数据。

这并不能直接解决您的多处理问题,但(正如Ugur MULUK和Iguananaut所提到的(我认为您的get_cumdd函数效率低下。Numpy提供np.cumsum。重新实现您的函数,对于一个包含10k个元素的数组,我可以获得超过1000倍的加速。有了10万个元素,它的速度大约快了7000倍。有了2M的元素,我没有费心让它结束。

# your function
def cum_dd(data):
#if not isinstance(data, list):
#    data = [data]
dd = np.zeros((len(data),1))
cum_dd = np.zeros((len(data),1))
for i in range(len(data)):
dd[i] = data[i]**2
cum_dd[i]=np.sum(dd[0:i])
return cum_dd
# numpy implementation
def cum_dd2(data):
# adding an axis to match the shape of the output of your cum_dd function
return np.cumsum(data**2)[:, np.newaxis]

对于2e6点,这个实现在我的计算机上需要大约11ms。我认为对于一个站点来说,10年的数据大约是30秒。

NumPy已经在CPU和GPU上实现了高效的并行处理。处理算法使用单指令多数据(SIMD(指令。

通过手动汇集计算,您正在降低效率。您可以通过对显式for循环进行矢量化来提高性能。

有关矢量化的更多信息,请参阅下面的视频。

https://www.youtube.com/watch?v=qsIrQi0fzbY

如果你有困难,我会随时提供最新消息或帮助。祝你好运

非常感谢您的评论和回答!在应用了矢量化和池化之后,我将计算时间从1小时减少到了3秒(10*170万个数据点(。我在这里有我的代码,以防有人感兴趣,

def get_cumdd(data):
#if not isinstance(data, list):
#    data = [data]
dd = np.zeros((len(data),1))
for i in range(len(data)):
dd[i] = data[i]**2
cum_dd=np.cumsum(dd)
return dd,cum_dd
if __name__ == '__main__':
n_proc = np.min([mp.cpu_count(),nstation])
p = mp.Pool(processes=int(n_proc))       
result = p.map(CC.get_cumdd,d)
p.close()
p.join()

我不使用共享内存队列,因为我的所有进程都是相互独立的。

最新更新