每次并行选择数据帧的一部分



我想在循环中创建字典。

因为,在每次迭代中,我只获取初始数据帧(df_train = df[df['CLASS'] == oneClass](的一部分,所以我想使其并行。

我的代码是:

import pandas as pd
import numpy as np
from multiprocessing import Pool
df = pd.DataFrame({'a':[0,1,2], 'b':[3, 4, 5], 'c': [6, 7, 8], 'CLASS':['A', 'B', 'C']})

def make_dataframes(df, oneClass):
new_df = {}
df_train = df[df['CLASS'] == oneClass]
numeric_only_data_cols = df_train.select_dtypes(include=np.number).columns.difference(['CLASS'])
numeric_only_data = df_train[numeric_only_data_cols]
X = numeric_only_data.values
x = X * 100

orig_columns = numeric_only_data.loc[:, 
numeric_only_data.columns!='CLASS'].columns

new_df[oneClass] = pd.DataFrame(x, columns=orig_columns)
new_df[oneClass]['CLASS'] = df_train['CLASS']

return new_df

new_df = {}
classes = np.unique(df['CLASS'])
with Pool(4) as pool:
for new_dataframe in pool.map(make_dataframes, classes):
new_df['new_dataframe'] = new_dataframe
pool.close()
pool.join()

我省略了函数中的for循环:

new_df = {}
for oneClass in classes:
df_train = df[df['GROUP_DESC'] == oneClass]
...

现在,我收到:

make_dataframes() missing 1 required positional argument: 'oneClass'

我不知道如何放置函数的参数,也不知道classes是否是map的有效参数。

您是否计划在集群中执行代码?如果没有,那么你最好用老式的单进程方式执行代码。Raymond Hettinger有一篇关于这个主题的精彩演讲,我觉得很有趣,我建议大家看看:Raymond Hentinger,《并发主题演讲》,PyBay 2017。

话虽如此,实现的一个简单解决方案是将单个参数定义为make_dataframes的输入,该参数表示dfoneClass的元组:

import pandas as pd
import numpy as np
from multiprocessing import Pool

def make_dataframes(args):
new_df = {}
df = args[0]        # <--- Unpacking values
oneClass = args[-1] # <--- Unpacking values
df_train = df[df['CLASS'] == oneClass]
numeric_only_data = df_train.select_dtypes(include=np.number).loc[:, lambda xdf: xdf.columns.difference(['CLASS'])]
X = numeric_only_data.values
x = X * 100

orig_columns = numeric_only_data.loc[:, numeric_only_data.columns != 'CLASS'].columns
new_df[oneClass] = pd.DataFrame(x, columns=orig_columns)
new_df[oneClass]['CLASS'] = df_train['CLASS']
return new_df

df = pd.DataFrame({'a':[0,1,2], 'b':[3, 4, 5], 'c': [6, 7, 8], 'CLASS':['A', 'B', 'C']})
new_df = {}
classes = np.unique(df["CLASS"])
with Pool(4) as pool:
for new_dataframe in pool.map(make_dataframes, zip([df]*len(classes), classes)):
new_df[list(new_dataframe.keys())[0]] = list(new_dataframe.values())[0]
pool.close()
pool.join()

第二种方法是使用Joblib包而不是multiprocessing,如下所示:


import pandas as pd
import numpy as np
from joblib import Parallel, delayed

def make_dataframes(df, oneClass):
new_df = {}
df_train = df[df["CLASS"] == oneClass]
numeric_only_data = df_train.select_dtypes(include=np.number).loc[
:, lambda xdf: xdf.columns.difference(["CLASS"])
]
X = numeric_only_data.values
x = X * 100
orig_columns = numeric_only_data.loc[
:, numeric_only_data.columns != "CLASS"
].columns
new_df[oneClass] = pd.DataFrame(x, columns=orig_columns)
new_df[oneClass]["CLASS"] = df_train["CLASS"]
return new_df

df = pd.DataFrame({'a':[0,1,2], 'b':[3, 4, 5], 'c': [6, 7, 8], 'CLASS':['A', 'B', 'C']})
classes = np.unique(df["CLASS"])
new_df = {
key: value
for parallel in Parallel(n_jobs=4)(
delayed(make_dataframes)(df, i) for i in classes
)
for key, value in parallel.items()
}

最后,如果你不打算在耗电的集群中运行此代码,并且需要从中提取所有的能量,我建议你使用的方法是:


import pandas as pd
import numpy as np
from joblib import Parallel, delayed

def make_dataframes(df, oneClass):
new_df = {}
df_train = df[df["CLASS"] == oneClass]
numeric_only_data = df_train.select_dtypes(include=np.number).loc[
:, lambda xdf: xdf.columns.difference(["CLASS"])
]
X = numeric_only_data.values
x = X * 100
orig_columns = numeric_only_data.loc[
:, numeric_only_data.columns != "CLASS"
].columns
new_df[oneClass] = pd.DataFrame(x, columns=orig_columns)
new_df[oneClass]["CLASS"] = df_train["CLASS"]
return new_df

df = pd.DataFrame({'a':[0,1,2], 'b':[3, 4, 5], 'c': [6, 7, 8], 'CLASS':['A', 'B', 'C']})
classes = np.unique(df["CLASS"])
new_df = {c: make_dataframes(df, c)[c] for c in classes}

为了进行比较,我记录了每个方法的执行时间:

  • multiprocessing:CPU时间:用户13.6毫秒,系统:41.1毫秒,总计:54.7毫秒壁时间:158毫秒
  • joblib:CPU时间:用户14.3 ms,系统:0 ns,总计:14.3 ms壁时间:16.5 ms
  • Serial processing:CPU时间:用户14.1毫秒,系统:797µs,总计:14.9毫秒壁时间:14.9 ms

并行运行会在不同的处理节点之间产生大量开销通信成本。此外,这是一项本质上更复杂的任务,然后串行运行。因此,开发和维护代码变得越来越困难和昂贵。如果并行运行是首要任务,我建议首先放弃Pandas,改用PySpark或Dask。

相关内容

  • 没有找到相关文章

最新更新