将函数应用于具有多处理功能的DataFrames时,设置新的列名



我有一个函数,它接收Pandas系列,并返回一个DataFrame,其中包含对原始系列的一些计算,并且具有与原始系列名称不同的列名:

def vel_acc(series, n=1, change_names=False):
vel = series.diff(n) / n
acc = vel.diff()
return pd.concat([vel, acc], axis=1)

第二个版本更改了输出列名

def vel_acc_change_col_names(series, n=1):
vel = series.diff(n) / n
acc = vel.diff()
vel.name = "VEL_{} ({})".format(str(n), series.name)
acc.name="ACC_{} ({})".format(str(n), series.name)
return pd.concat([vel, acc], axis=1)

我想把它应用到一个大的DataFrame上,它的形状是~(30000400)。因此,我编写了另一个函数来使用多处理,并将计算扩展到CPU的8个核心,基于此:

from multiprocessing import Pool, cpu_count
def process_Pandas_data(func, df, num_processes=None):
if num_processes==None:
num_processes = min(df.shape[1], cpu_count())
with Pool(num_processes) as 
seq = np.array_split(df, num_processes, axis=1)
results_list = pool.map(func, seq)
return pd.concat(results_list, axis=1)

我正在处理时间序列价格数据,格式为

Close    High     Low    Open
Date                                               
2013-08-25 08:00:00  164.36  164.36  123.51  144.02
2013-08-25 09:00:00  127.40  165.75  127.40  164.36
2013-08-25 10:00:00  111.83  127.40  111.83  127.40
2013-08-25 11:00:00  121.35  121.35  111.83  111.83
2013-08-25 12:00:00  121.44  129.18  113.88  121.35
2013-08-25 13:00:00  133.10  133.10  113.80  121.44
2013-08-25 14:00:00  133.10  133.10  133.10  133.10
2013-08-25 15:00:00  121.47  133.10  107.66  133.10
2013-08-25 16:00:00  127.58  139.58  121.47  121.47
2013-08-25 17:00:00  151.22  164.52  127.58  127.58

当我进行类似的函数调用时

process_Pandas_data(vel_acc, res.iloc[:, :4]).tail()

结果是

Close  Close   High   High    Low    Low   Open   Open
Date                                                                       
2013-08-25 13:00:00  11.66  11.57   3.92  -3.91  -0.08  -2.13   0.09  -9.43
2013-08-25 14:00:00   0.00 -11.66   0.00  -3.92  19.30  19.38  11.66  11.57
2013-08-25 15:00:00 -11.63 -11.63   0.00   0.00 -25.44 -44.74   0.00 -11.66
2013-08-25 16:00:00   6.11  17.74   6.48   6.48  13.81  39.25 -11.63 -11.63
2013-08-25 17:00:00  23.64  17.53  24.94  18.46   6.11  -7.70   6.11  17.74

当我取消注释vel_acc中为函数结果设置新名称的两行时,问题就出现了。我希望列名的形式为VEL_1(关闭)、ACC_1(关闭)。。。。当调用process_Pandas_data(vel_acc,res..)时,它会返回以下错误:

RemoteTraceback                           Traceback (most recent call last)
RemoteTraceback: 
"""
Traceback (most recent call last):
File "/home/xoel/anaconda3/envs/tensorflow/lib/python3.5/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "/home/xoel/anaconda3/envs/tensorflow/lib/python3.5/multiprocessing/pool.py", line 44, in mapstar
return list(map(*args))
File "<ipython-input-94-1a5f8ee15a6a>", line 5, in vel_acc
vel.name = "VEL_{} ({})".format(str(n), series.name)
File "/home/xoel/anaconda3/envs/tensorflow/lib/python3.5/site-packages/pandas/core/generic.py", line 3081, in __getattr__
return object.__getattribute__(self, name)
AttributeError: 'DataFrame' object has no attribute 'name'
"""
The above exception was the direct cause of the following exception:
AttributeError                            Traceback (most recent call last)
<ipython-input-97-5d13d64e3941> in <module>()
----> 1 process_Pandas_data(vel_acc, res.iloc[:, :4]).tail()
<ipython-input-37-6e37653b2e53> in process_Pandas_data(func, df, num_processes)
11         seq = np.array_split(df, num_partitions, axis=1)
12 
---> 13         results_list = pool.map(func, seq)
14 
15         return pd.concat(results_list, axis=1)
~/anaconda3/envs/tensorflow/lib/python3.5/multiprocessing/pool.py in map(self, func, iterable, chunksize)
264         in a list that is returned.
265         '''
--> 266         return self._map_async(func, iterable, mapstar, chunksize).get()
267 
268     def starmap(self, func, iterable, chunksize=None):
~/anaconda3/envs/tensorflow/lib/python3.5/multiprocessing/pool.py in get(self, timeout)
642             return self._value
643         else:
--> 644             raise self._value
645 
646     def _set(self, i, obj):
AttributeError: 'DataFrame' object has no attribute 'name'

那么,有没有一种方法可以将函数传递给多处理。在熊猫身上活动的水池。DataFrames为输出DataFrames设置新名称?

问题是np.array_split(df, num_processes, axis=1)创建的是DataFrame对象列表,而不是Series对象列表。DataFrame对象没有name属性,这就是获得AttributeError的原因。

您应该更改seq的定义,以确保它包含Series的实例,例如:

seq = [df[column_name] for column_name in df]

这应该避免AttributeError

相关内容

  • 没有找到相关文章

最新更新