在 Python 中也包含"for"循环的已定义函数上对"for"循环应用并行处理



我正在寻找Python中复杂for循环的并行处理,并且不知道如何将其应用于我的情况。假设我有一个文件input.txt如下所示:

Group   Process Category        Type    Var1    Var2    Var3
A       3       cat1    type1   86.84   2.913   0.01096
A       3       cat1    type1   103.39  2.835   0.00564
A       3       cat1    type1   109.00  1.478   0.00365
A       3       cat1    type1   107.30  2.979   0.00631
A       3       cat1    type1   123.09  2.424   0.00531
A       3       cat1    type1   111.98  7.462   0.00332
A       841     cat2    type2   87.62   3.049   0.01195
A       841     cat2    type2   87.40   4.781   0.00930
A       841     cat2    type2   88.53   3.025   0.00697
A       841     cat2    type2   85.84   2.703   0.00697

理想情况下,我想按GroupProcessCategoryType进行分组,并使用四个定义的函数对Var1Var2Var3进行一些计算,其中三个也包含循环for。实现output如下:

Group   Type  Process Category        Var1       Var2       Var3
0     A  type1        3     cat1  101.207332  13.997181  106.30899
1     A  type2      841     cat2   87.431341   3.584393  106.30899

实现的完整代码如下:

import pandas as pd
import numpy as np
from dplython import X, sift, DplyFrame, mutate, select
from plydata import define, group_by, summarize
def weightedMean(data):
length = len(data['Var1'])
if length == 1:
mx = data['Var1']
return(length)
else:
mx = data['Var1'][0]
nx = data['Var3'][0]
for i in range(1,length):
my = data['Var1'][i]
ny = data['Var3'][i]
nx = nx + ny
mx=(mx*nx+my*ny)/(nx+ny)
return(mx)
def summation(data):
length = len(data['Var3'])
cx = data['Var3'][0]
for i in range(1,length):
cy = data['Var3'][i]
cx = cx + cy
return(cx)
def sd_c(x_m, x_s, x_n, y_m, y_s, y_n):
al = x_n+y_n
tmp_sd = al*((x_n-1)*(x_s*x_s)+(y_n-1)*(y_s*y_s))+y_n*x_n*(x_m-y_m)*(x_m-y_m)
var = tmp_sd/(al*(al-1))
std = np.sqrt(var)
return(std)
def sd_pooled(data):
length = len(data['Var1'])
if length == 1:
mx = data['Var1']
return(length)
else:
mx = data['Var1'][0]
sx = data['Var2'][0]
nx = data['Var3'][0]
for i in range(1,length):
my = data['Var1'][i]
sy = data['Var2'][i]
ny = data['Var3'][i]
sx = sd_c(mx, sx, nx, my, sy, ny)
nx = nx + ny
mx = (mx*nx + my*ny)/(nx + ny)
return(sx)
dat = pd.read_csv("input.txt",sep="t")
dat_name = dat.loc[:,'Type'].unique()
dat = DplyFrame(dat)
out = pd.DataFrame([])
for i in range(len(dat_name)):
df = (dat >>
sift(X.Type == dat_name[i]) >>
mutate(Var3 = X.Var3*3021) >>
sift(X.Var2 < 50))
out = out.append(df)
out_grouped = out.groupby(['Group', 'Type', 'Process', 'Category'])
init = []
mean = []
stdv = []
freq = []
kmer = []
for name, group in out_grouped:
group = pd.DataFrame(group).reset_index()
nm = name
wm = weightedMean(group)
sd = sd_pooled(group)
fq = summation(group)
init.append(nm)
mean.append(wm)
freq.append(fq)
stdv.append(sd)
init = pd.DataFrame(init)
mean = pd.DataFrame(mean)
freq = pd.DataFrame(freq)
stdv = pd.DataFrame(stdv)
init.rename(columns={0:'Group',1:'Type',2:'Process',3:'Category'}, inplace=True)
mean.rename(columns={0:'Var1'}, inplace=True)
stdv.rename(columns={0:'Var2'}, inplace=True)
freq.rename(columns={0:'Var3'}, inplace=True)
output = pd.concat([init.reset_index(drop=True), mean, stdv, freq], axis=1)

在这种情况下,如何使用多核应用并行处理?提前谢谢。

这里有很多东西要解开,所以请耐心等待。

我已将数据集存储到名为dat的熊猫数据帧中

Group   Process Category    Type    Var1    Var2    Var3
0       A      3    cat1       type1    86.84   2.913   0.01096
1       A      3    cat1       type1    103.39  2.835   0.00564
2       A      3    cat1       type1    109.00  1.478   0.00365
3       A      3    cat1       type1    107.30  2.979   0.00631
4       A      3    cat1       type1    123.09  2.424   0.00531
5       A      3    cat1       type1    111.98  7.462   0.00332
6       A      841  cat2       type2    87.62   3.049   0.01195
7       A      841  cat2       type2    87.40   4.781   0.00930
8       A      841  cat2       type2    88.53   3.025   0.00697
9       A      841  cat2       type2    85.84   2.703   0.00697

下一步是按ProcessCategoryType对表进行分组(我现在省略了Group,因为它似乎只有 1 个唯一值Group)

我使用了.sum()因为这就是您所说的您希望在分组期间聚合Var变量的方式。

out_grouped = dat.groupby(['Type', 'Process', 'Category']).sum()

这是分组后数据帧的外观 -

Var1   Var2    Var3
Type    Process Category            
type1   3         cat1               641.60 20.091  0.03519
type2   841       cat2               349.39 13.558  0.03519

下一步是要在此聚合的每一行上应用函数 -

必须将函数设计为仅依赖于数据帧的每个原子行。

加权均值函数不应窥视其他行进行计算。如果确实有要求,请将这些值作为预处理步骤在数据帧的每一行中可用。

def weightedMean(data):
l = data['Var1']
m = data['Var2']
n = data['Var3']
wm = l*m*n/l+m+n
return wm

最后一步是对数据帧的每一行应用weightedMean函数。

这是如何在单个内核上执行此操作-

out_grouped["weighted_mean"] = out_grouped.apply(weightedMean, axis=1)

要跨多个内核分配此计算 - 您可以使用 DASK API。

附言我很确定我在解释您的问题陈述的方式上犯了一些错误。所以请随时纠正我,我会适当地修改代码。

这是应用函数后数据帧的外观 -

Var1      Var2    Var3    weighted_mean
Type    Process Category                
type1   3       cat1          641.60    20.091  0.03519 20.833192
type2   841     cat2          349.39    13.558  0.03519 14.070296

最新更新