我正在尝试弄清楚如何在多个内核上运行一个大问题。我正在努力将数据帧拆分为不同的进程。
我有一个课程如下:
class Pergroup():
def __init__(self, groupid):
...
def process_datapoint(self, df_in, group):
...
我的数据是一个时间序列,包含可以使用groupid
列进行分组的事件。我为每个组创建一个类的实例,如下所示:
for groupname in df_in['groupid'].unique():
instance_names.append(groupname)
holder = {name: Pergroup(name) for name in instance_names}
现在,对于数据帧中的每个时间戳,我想调用相应的实例(基于组(,并将该时间戳的数据帧传递给它。
我已经尝试了以下方法,它似乎没有像我预期的那样并行化:
for val in range(0, len(df_in)):
current_group = df_in['groupid'][val]
current_df = df_in.ix[val]
with concurrent.futures.ProcessPoolExecutor() as executor:
executor.map(holder[current_group].process_datapoint, current_df, current_group)
我也尝试使用它,它在调用实例时将 df 拆分为其列:
Parallel(n_jobs=-1)(map(delayed(holder[current_group].process_datapoint), current_df, current_group))
我应该如何分解数据帧,以便我仍然可以使用正确的数据调用正确的实例?基本上,我正在尝试运行如下所示的循环,最后一行并行运行:
for val in range(0, len(df_in)):
current_group = df_in['groupid'][val]
current_df = df_in.ix[val]
holder[current_group].process_datapoint(current_df, current_group) #This call should be initiated in as many cores as possible.
使用pool
的方法略有不同
import pandas as pd
from multiprocessing import Pool
# First make sure each process has its own data
groups = df_in['groupid'].unique().values
data = [(group_id, holder[group_id], df_in.ix[group_id])
for group for groups]
# Prepare a function that can take this data as input
def help_func(current_group, holder, current_df):
return holder.process_datapoint(current_df, current_group)
# Run in parallel
with Pool(processes=4) as p:
p.map(help_func, data)
我在某个时候遇到了类似的问题;由于我可以完全适应你的问题,我希望你能转置并使其适合你的问题:
import multiprocessing
from joblib import Parallel, delayed
maxbatchsize = 10000 #limit the amount of data dispatched to each core
ncores = -1 #number of cores to use
data = pandas.DataFrame() #<<<- your dataframe
class DFconvoluter():
def __init__(self, myparam):
self.myparam = myparam
def __call__(self, df):
return df.apply(lamda row: row['somecolumn']*self.myparam)
nbatches = max(math.ceil(len(df)/maxbatchsize), ncores)
g = GenStrategicGroups( data['Key'].values, nbatches ) #a vector telling which row should be dispatched to which batch
#-- parallel part
def applyParallel(dfGrouped, func):
retLst = Parallel(n_jobs=ncores)(delayed(func)(group) for _, group in dfGrouped)
return pd.concat(retLst)
out = applyParallel(data.groupby(g), Dfconvoluter(42)))'
剩下的就是写,你想如何将批次分组在一起,对我来说,这必须以一种方式完成,以便行,其中"键"列中的值相似必须保持在一起:
def GenStrategicGroups(stratify, ngroups):
''' Generate a list of integers in a grouped sequence,
where grouped levels in stratifiy are preserved.
'''
g = []
nelpg = float(len(stratify)) / ngroups
prev_ = None
grouped_idx = 0
for i,s in enumerate(stratify):
if i > (grouped_idx+1)*nelpg:
if s != prev_:
grouped_idx += 1
g.append(grouped_idx)
prev_ = s
return g