我正在寻找Python中复杂for
循环的并行处理,并且不知道如何将其应用于我的情况。假设我有一个文件input.txt
如下所示:
Group Process Category Type Var1 Var2 Var3
A 3 cat1 type1 86.84 2.913 0.01096
A 3 cat1 type1 103.39 2.835 0.00564
A 3 cat1 type1 109.00 1.478 0.00365
A 3 cat1 type1 107.30 2.979 0.00631
A 3 cat1 type1 123.09 2.424 0.00531
A 3 cat1 type1 111.98 7.462 0.00332
A 841 cat2 type2 87.62 3.049 0.01195
A 841 cat2 type2 87.40 4.781 0.00930
A 841 cat2 type2 88.53 3.025 0.00697
A 841 cat2 type2 85.84 2.703 0.00697
理想情况下,我想按Group
、Process
、Category
和Type
进行分组,并使用四个定义的函数对Var1
、Var2
和Var3
进行一些计算,其中三个也包含循环for
。实现output
如下:
Group Type Process Category Var1 Var2 Var3
0 A type1 3 cat1 101.207332 13.997181 106.30899
1 A type2 841 cat2 87.431341 3.584393 106.30899
实现的完整代码如下:
import pandas as pd
import numpy as np
from dplython import X, sift, DplyFrame, mutate, select
from plydata import define, group_by, summarize
def weightedMean(data):
length = len(data['Var1'])
if length == 1:
mx = data['Var1']
return(length)
else:
mx = data['Var1'][0]
nx = data['Var3'][0]
for i in range(1,length):
my = data['Var1'][i]
ny = data['Var3'][i]
nx = nx + ny
mx=(mx*nx+my*ny)/(nx+ny)
return(mx)
def summation(data):
length = len(data['Var3'])
cx = data['Var3'][0]
for i in range(1,length):
cy = data['Var3'][i]
cx = cx + cy
return(cx)
def sd_c(x_m, x_s, x_n, y_m, y_s, y_n):
al = x_n+y_n
tmp_sd = al*((x_n-1)*(x_s*x_s)+(y_n-1)*(y_s*y_s))+y_n*x_n*(x_m-y_m)*(x_m-y_m)
var = tmp_sd/(al*(al-1))
std = np.sqrt(var)
return(std)
def sd_pooled(data):
length = len(data['Var1'])
if length == 1:
mx = data['Var1']
return(length)
else:
mx = data['Var1'][0]
sx = data['Var2'][0]
nx = data['Var3'][0]
for i in range(1,length):
my = data['Var1'][i]
sy = data['Var2'][i]
ny = data['Var3'][i]
sx = sd_c(mx, sx, nx, my, sy, ny)
nx = nx + ny
mx = (mx*nx + my*ny)/(nx + ny)
return(sx)
dat = pd.read_csv("input.txt",sep="t")
dat_name = dat.loc[:,'Type'].unique()
dat = DplyFrame(dat)
out = pd.DataFrame([])
for i in range(len(dat_name)):
df = (dat >>
sift(X.Type == dat_name[i]) >>
mutate(Var3 = X.Var3*3021) >>
sift(X.Var2 < 50))
out = out.append(df)
out_grouped = out.groupby(['Group', 'Type', 'Process', 'Category'])
init = []
mean = []
stdv = []
freq = []
kmer = []
for name, group in out_grouped:
group = pd.DataFrame(group).reset_index()
nm = name
wm = weightedMean(group)
sd = sd_pooled(group)
fq = summation(group)
init.append(nm)
mean.append(wm)
freq.append(fq)
stdv.append(sd)
init = pd.DataFrame(init)
mean = pd.DataFrame(mean)
freq = pd.DataFrame(freq)
stdv = pd.DataFrame(stdv)
init.rename(columns={0:'Group',1:'Type',2:'Process',3:'Category'}, inplace=True)
mean.rename(columns={0:'Var1'}, inplace=True)
stdv.rename(columns={0:'Var2'}, inplace=True)
freq.rename(columns={0:'Var3'}, inplace=True)
output = pd.concat([init.reset_index(drop=True), mean, stdv, freq], axis=1)
在这种情况下,如何使用多核应用并行处理?提前谢谢。
这里有很多东西要解开,所以请耐心等待。
我已将数据集存储到名为dat
的熊猫数据帧中
Group Process Category Type Var1 Var2 Var3
0 A 3 cat1 type1 86.84 2.913 0.01096
1 A 3 cat1 type1 103.39 2.835 0.00564
2 A 3 cat1 type1 109.00 1.478 0.00365
3 A 3 cat1 type1 107.30 2.979 0.00631
4 A 3 cat1 type1 123.09 2.424 0.00531
5 A 3 cat1 type1 111.98 7.462 0.00332
6 A 841 cat2 type2 87.62 3.049 0.01195
7 A 841 cat2 type2 87.40 4.781 0.00930
8 A 841 cat2 type2 88.53 3.025 0.00697
9 A 841 cat2 type2 85.84 2.703 0.00697
下一步是按Process
、Category
和Type
对表进行分组(我现在省略了Group
,因为它似乎只有 1 个唯一值Group
)
我使用了.sum()
因为这就是您所说的您希望在分组期间聚合Var
变量的方式。
out_grouped = dat.groupby(['Type', 'Process', 'Category']).sum()
这是分组后数据帧的外观 -
Var1 Var2 Var3
Type Process Category
type1 3 cat1 641.60 20.091 0.03519
type2 841 cat2 349.39 13.558 0.03519
下一步是要在此聚合的每一行上应用函数 -
必须将函数设计为仅依赖于数据帧的每个原子行。
加权均值函数不应窥视其他行进行计算。如果确实有要求,请将这些值作为预处理步骤在数据帧的每一行中可用。
def weightedMean(data):
l = data['Var1']
m = data['Var2']
n = data['Var3']
wm = l*m*n/l+m+n
return wm
最后一步是对数据帧的每一行应用weightedMean
函数。
这是如何在单个内核上执行此操作-
out_grouped["weighted_mean"] = out_grouped.apply(weightedMean, axis=1)
要跨多个内核分配此计算 - 您可以使用 DASK API。
附言我很确定我在解释您的问题陈述的方式上犯了一些错误。所以请随时纠正我,我会适当地修改代码。
这是应用函数后数据帧的外观 -
Var1 Var2 Var3 weighted_mean
Type Process Category
type1 3 cat1 641.60 20.091 0.03519 20.833192
type2 841 cat2 349.39 13.558 0.03519 14.070296