我想在循环中创建字典。
因为,在每次迭代中,我只获取初始数据帧(df_train = df[df['CLASS'] == oneClass]
(的一部分,所以我想使其并行。
我的代码是:
import pandas as pd
import numpy as np
from multiprocessing import Pool
df = pd.DataFrame({'a':[0,1,2], 'b':[3, 4, 5], 'c': [6, 7, 8], 'CLASS':['A', 'B', 'C']})
def make_dataframes(df, oneClass):
new_df = {}
df_train = df[df['CLASS'] == oneClass]
numeric_only_data_cols = df_train.select_dtypes(include=np.number).columns.difference(['CLASS'])
numeric_only_data = df_train[numeric_only_data_cols]
X = numeric_only_data.values
x = X * 100
orig_columns = numeric_only_data.loc[:,
numeric_only_data.columns!='CLASS'].columns
new_df[oneClass] = pd.DataFrame(x, columns=orig_columns)
new_df[oneClass]['CLASS'] = df_train['CLASS']
return new_df
new_df = {}
classes = np.unique(df['CLASS'])
with Pool(4) as pool:
for new_dataframe in pool.map(make_dataframes, classes):
new_df['new_dataframe'] = new_dataframe
pool.close()
pool.join()
我省略了函数中的for循环:
new_df = {}
for oneClass in classes:
df_train = df[df['GROUP_DESC'] == oneClass]
...
现在,我收到:
make_dataframes() missing 1 required positional argument: 'oneClass'
我不知道如何放置函数的参数,也不知道classes
是否是map
的有效参数。
您是否计划在集群中执行代码?如果没有,那么你最好用老式的单进程方式执行代码。Raymond Hettinger有一篇关于这个主题的精彩演讲,我觉得很有趣,我建议大家看看:Raymond Hentinger,《并发主题演讲》,PyBay 2017。
话虽如此,实现的一个简单解决方案是将单个参数定义为make_dataframes
的输入,该参数表示df
和oneClass
的元组:
import pandas as pd
import numpy as np
from multiprocessing import Pool
def make_dataframes(args):
new_df = {}
df = args[0] # <--- Unpacking values
oneClass = args[-1] # <--- Unpacking values
df_train = df[df['CLASS'] == oneClass]
numeric_only_data = df_train.select_dtypes(include=np.number).loc[:, lambda xdf: xdf.columns.difference(['CLASS'])]
X = numeric_only_data.values
x = X * 100
orig_columns = numeric_only_data.loc[:, numeric_only_data.columns != 'CLASS'].columns
new_df[oneClass] = pd.DataFrame(x, columns=orig_columns)
new_df[oneClass]['CLASS'] = df_train['CLASS']
return new_df
df = pd.DataFrame({'a':[0,1,2], 'b':[3, 4, 5], 'c': [6, 7, 8], 'CLASS':['A', 'B', 'C']})
new_df = {}
classes = np.unique(df["CLASS"])
with Pool(4) as pool:
for new_dataframe in pool.map(make_dataframes, zip([df]*len(classes), classes)):
new_df[list(new_dataframe.keys())[0]] = list(new_dataframe.values())[0]
pool.close()
pool.join()
第二种方法是使用Joblib包而不是multiprocessing
,如下所示:
import pandas as pd
import numpy as np
from joblib import Parallel, delayed
def make_dataframes(df, oneClass):
new_df = {}
df_train = df[df["CLASS"] == oneClass]
numeric_only_data = df_train.select_dtypes(include=np.number).loc[
:, lambda xdf: xdf.columns.difference(["CLASS"])
]
X = numeric_only_data.values
x = X * 100
orig_columns = numeric_only_data.loc[
:, numeric_only_data.columns != "CLASS"
].columns
new_df[oneClass] = pd.DataFrame(x, columns=orig_columns)
new_df[oneClass]["CLASS"] = df_train["CLASS"]
return new_df
df = pd.DataFrame({'a':[0,1,2], 'b':[3, 4, 5], 'c': [6, 7, 8], 'CLASS':['A', 'B', 'C']})
classes = np.unique(df["CLASS"])
new_df = {
key: value
for parallel in Parallel(n_jobs=4)(
delayed(make_dataframes)(df, i) for i in classes
)
for key, value in parallel.items()
}
最后,如果你不打算在耗电的集群中运行此代码,并且需要从中提取所有的能量,我建议你使用的方法是:
import pandas as pd
import numpy as np
from joblib import Parallel, delayed
def make_dataframes(df, oneClass):
new_df = {}
df_train = df[df["CLASS"] == oneClass]
numeric_only_data = df_train.select_dtypes(include=np.number).loc[
:, lambda xdf: xdf.columns.difference(["CLASS"])
]
X = numeric_only_data.values
x = X * 100
orig_columns = numeric_only_data.loc[
:, numeric_only_data.columns != "CLASS"
].columns
new_df[oneClass] = pd.DataFrame(x, columns=orig_columns)
new_df[oneClass]["CLASS"] = df_train["CLASS"]
return new_df
df = pd.DataFrame({'a':[0,1,2], 'b':[3, 4, 5], 'c': [6, 7, 8], 'CLASS':['A', 'B', 'C']})
classes = np.unique(df["CLASS"])
new_df = {c: make_dataframes(df, c)[c] for c in classes}
为了进行比较,我记录了每个方法的执行时间:
multiprocessing
:CPU时间:用户13.6毫秒,系统:41.1毫秒,总计:54.7毫秒壁时间:158毫秒joblib
:CPU时间:用户14.3 ms,系统:0 ns,总计:14.3 ms壁时间:16.5 msSerial processing
:CPU时间:用户14.1毫秒,系统:797µs,总计:14.9毫秒壁时间:14.9 ms
并行运行会在不同的处理节点之间产生大量开销通信成本。此外,这是一项本质上更复杂的任务,然后串行运行。因此,开发和维护代码变得越来越困难和昂贵。如果并行运行是首要任务,我建议首先放弃Pandas,改用PySpark或Dask。