如何确保使用配置的 CPU 内核进行多处理代码?



我用multiprocessing Pool来运行parallel。我首先尝试使用4内核,HPC使用 sub。当它使用 4 核时,与 1 核相比,时间减少了 4 倍。当我检查qstat时,有几次它使用 4 个内核,但之后只有 1 个内核,代码完全相同。

你能给我一些建议,我的代码或系统有什么问题吗?

import pandas as pd
import numpy as np
from multiprocessing import Pool
from datetime import datetime
t1 = pd.read_csv("template.csv",header=None)
s1 = pd.read_csv("/home/donp/dude_1000_raw_raw/dude_1000_raw_raw_adfr.csv")
s2 = pd.read_csv("/home/donp/dude_1000_raw_raw/dude_1000_raw_raw_dock.csv")
s3 = pd.read_csv("/home/donp/dude_1000_raw_raw/dude_1000_raw_raw_gemdock.csv")
s4 = pd.read_csv("/home/donp/dude_1000_raw_raw/dude_1000_raw_raw_ledock.csv")
s5 = pd.read_csv("/home/donp/dude_1000_raw_raw/dude_1000_raw_raw_plants.csv")
s6 = pd.read_csv("/home/donp/dude_1000_raw_raw/dude_1000_raw_raw_psovina.csv")
s7 = pd.read_csv("/home/donp/dude_1000_raw_raw/dude_1000_raw_raw_quickvina2.csv")
s8 = pd.read_csv("/home/donp/dude_1000_raw_raw/dude_1000_raw_raw_smina.csv")
s9 = pd.read_csv("/home/donp/dude_1000_raw_raw/dude_1000_raw_raw_vina.csv")
s10 = pd.read_csv("/home/donp/dude_1000_raw_raw/dude_1000_raw_raw_vinaxb.csv")
#number of core and arrays
n = 4
m = (len(t1) // n)+1
g= m*n - len(t1)
for g1 in range(g):
t1.loc[len(t1)]=0

results=[]
def block_linear(i):
temp = pd.DataFrame(np.zeros((m,29)))
for a in range(0,m):
sum_matrix = (t1.iloc[a,0]*s1) + (t1.iloc[a,1]*s2) + (t1.iloc[a,2]*s3)+ (t1.iloc[a,3]*s4) + (t1.iloc[a,4]*s5) + (t1.iloc[a,5]*s6) + (t1.iloc[a,6]*s7) + (t1.iloc[a,7]*s8) + (t1.iloc[a,8]*s9) + (t1.iloc[a,9]*s10)
rank_sum= pd.DataFrame.rank(sum_matrix,axis=0,ascending=True,method='min') #real-True
temp.iloc[a,:] = rank_sum.iloc[999].values
temp['median'] = temp.median(axis=1)
temp.index = range(i*m,(i+1)*m)
return temp
start=datetime.now()
if __name__ == '__main__':
pool = Pool(processes=n)
results = pool.map(block_linear,range(0,n))
print(datetime.now()-start)
out=pd.concat(results)
out.drop(out.tail(g).index,inplace=True)
out.to_csv('test_10dock_4core.csv',index=False)

主要思想是将大表切成更小的表格,运行计算并组合在一起。

如果没有更详细地使用多处理的 Pool 包,真的很难理解和帮助。请注意,Pool 包不保证并行化:例如,_apply函数仅使用 Pool 的一个工作线程,并阻止所有执行。您可以在此处和那里查看有关它的更多详细信息。

但是,假设您正确使用该库,您应该确保您的代码是完全可并行化的:例如,磁盘上的 I/O 操作可能会成为并行化的瓶颈,从而使您的代码一次只在一个进程中运行。

我希望它有所帮助。


[编辑] 由于您提供了有关问题的更多详细信息,我可以提供更具体的提示:

第一件事是你的代码是零并行的。您只是调用相同的函数 N 次。这不是多处理应该如何工作的。 相反,应该并行的部分是通常在for循环中的部分,就像您在 block_linear(( 中的那个一样。

所以,我向你推荐的是:

您应该更改代码以首先计算所有加权总和,然后才执行其余操作。这将对并行化有很大帮助。 因此,将此操作放在一个函数中:

def weighted_sum(column,df2):
temp = pd.DataFrame(np.zeros(m))
for a in range(0,m):
result = (t1.iloc[a,column]*df2)
temp.iloc[a] = result
return temp

然后,您使用 pool.starmap 来并行您拥有的 10 个数据帧的函数,如下所示:

结果 = pool.starmap(weighted_sum,[(0,s1(,(1,s2(,(2,s3(,....,[9,s10]](

PS:pool.starmap类似于pool.map,但接受元组参数列表。您可以在此处获得更多详细信息。

最后但并非最不重要的一点是,您应该对结果进行操作以结束计算。由于每列将有一个weighted_sum,因此您可以对列应用总和,然后对rank_sum应用总和。

这不是一个完全可运行的代码来解决你的问题,而是关于你应该如何重组你的代码以获得多处理优势的一般指南。我建议您在数据框的子样本上对其进行测试,以确保在所有数据上运行它之前它正常工作。

最新更新