对数据帧集合使用多处理



设置

我有多个数据集,每个数据集都有自己的DataFrame。在将我的结果与单独的DataFrame进行比较之前,我正在其中运行计算,我们可以将其视为约束。

例如,假设字典中有两组数据:

df_data_1 = pd.DataFrame(np.random.randint(0,50,size=(10, 4)), columns=list('ABCD'))
df_data_2 = pd.DataFrame(np.random.randint(0,50,size=(10, 4)), columns=list('ABCD'))

data_sets = {'data_1': df_data, 'data_2': df_data_2}

以及一组约束条件:

df_constraints = pd.DataFrame([['a', 10, 20, 10000000], 
['b', 100, 200, 20000000], 
['c', 1000, 2000, 30000000]])
df_constraints.columns = ['index', 'sumMin', 'sumMax', 'productMax']
df_constraints.set_index('index', inplace=True)

直观:
data_set_1
data_set_2
约束

功能

我在每组数据中进行计算,然后将它们与一组约束条件进行比较。为了简化我的问题,我只是将数据与第一行约束进行比较,但实际上,我必须将每个数据集中的计算结果与多达20组约束进行比较。

以下是我试图并行运行的函数的简化版本:

def test_func(df_data, df_constraints):
# Run some calculations
df = df_data.copy()
df['sum'] = df.sum(axis=1)
df['product'] = df.product(axis=1)

# Compare results to constraints
df['sumFit'] = ((df['sum'] > df_constraints.loc['a', 'sumMin']) &
(df['sum'] < df_constraints.loc['a', 'sumMax'])) 
df['productFit'] = df['product'] < df_constraints.loc['a', 'productMax']

# Analyze results
count_sumFits = df['sumFit'].sum()
count_productFits = df['productFit'].sum()

df_results = pd.DataFrame([['data_set_1', count_sumFits, count_productFits]], 
columns=['DataSet', 'FittingSums', 'FittingProducts'])
df_results.set_index('DataSet', inplace=True)

return df_results

顺序版本

我可以在每组数据中依次运行此函数;使用while循环迭代字典,然后如图所示附加结果,但随着复杂性的增加,这需要比我希望的更长的时间。(它很难看,但很管用(

n=0
while n < len(data_sets):
data_set_names = list(data_sets.keys())
df_temp = test_func(data_sets[data_set_names[n]], df_constraints)

df_all_results.loc[n, 'FittingSums']     = df_temp.loc[0, 'FittingSums']
df_all_results.loc[n, 'FittingProducts'] = df_temp.loc[0, 'FittingProducts']
n+=1

问题

当我有25个数据集,并且我用更多的计算来运行更复杂的分析时,运行时间最终会长达几分钟。这让我开始追求并发/多处理。我希望这能明显加快速度,因为这是我试图优化的许多步骤中的一步,然后运行数千次。

所以,多处理器

由于需要向函数传递两个参数,我一直在研究mp.Pool.starmappool.map(partial(test_func, b=df_constraints), data_sets,但这两种方法都无法正常工作。

例1(mp.Pool.starmap

if __name__ == '__main__':
pool = mp.Pool(processes = 8)
output = pool.starmap(test_file.test_func, zip(data_sets, itertools.repeat(df_contraints)

这是我所能做到的。有可能像这样同时处理数据,然后将结果附加到数据帧中吗?我不需要它们按任何特定的顺序排列,我只想将数据转换成正确的格式。

我不完全理解您的代码和逻辑,但用data_sets.values():替换了data_sets

if __name__ == '__main__':
pool = mp.Pool(processes = 8)
output = pool.starmap(test_file.test_func, zip(data_sets.values(),itertools.repeat(df_contraints)))

相关内容

  • 没有找到相关文章

最新更新