我有一个函数,它接收Pandas系列,并返回一个DataFrame,其中包含对原始系列的一些计算,并且具有与原始系列名称不同的列名:
def vel_acc(series, n=1, change_names=False):
vel = series.diff(n) / n
acc = vel.diff()
return pd.concat([vel, acc], axis=1)
第二个版本更改了输出列名
def vel_acc_change_col_names(series, n=1):
vel = series.diff(n) / n
acc = vel.diff()
vel.name = "VEL_{} ({})".format(str(n), series.name)
acc.name="ACC_{} ({})".format(str(n), series.name)
return pd.concat([vel, acc], axis=1)
我想把它应用到一个大的DataFrame上,它的形状是~(30000400)。因此,我编写了另一个函数来使用多处理,并将计算扩展到CPU的8个核心,基于此:
from multiprocessing import Pool, cpu_count
def process_Pandas_data(func, df, num_processes=None):
if num_processes==None:
num_processes = min(df.shape[1], cpu_count())
with Pool(num_processes) as
seq = np.array_split(df, num_processes, axis=1)
results_list = pool.map(func, seq)
return pd.concat(results_list, axis=1)
我正在处理时间序列价格数据,格式为
Close High Low Open
Date
2013-08-25 08:00:00 164.36 164.36 123.51 144.02
2013-08-25 09:00:00 127.40 165.75 127.40 164.36
2013-08-25 10:00:00 111.83 127.40 111.83 127.40
2013-08-25 11:00:00 121.35 121.35 111.83 111.83
2013-08-25 12:00:00 121.44 129.18 113.88 121.35
2013-08-25 13:00:00 133.10 133.10 113.80 121.44
2013-08-25 14:00:00 133.10 133.10 133.10 133.10
2013-08-25 15:00:00 121.47 133.10 107.66 133.10
2013-08-25 16:00:00 127.58 139.58 121.47 121.47
2013-08-25 17:00:00 151.22 164.52 127.58 127.58
当我进行类似的函数调用时
process_Pandas_data(vel_acc, res.iloc[:, :4]).tail()
结果是
Close Close High High Low Low Open Open
Date
2013-08-25 13:00:00 11.66 11.57 3.92 -3.91 -0.08 -2.13 0.09 -9.43
2013-08-25 14:00:00 0.00 -11.66 0.00 -3.92 19.30 19.38 11.66 11.57
2013-08-25 15:00:00 -11.63 -11.63 0.00 0.00 -25.44 -44.74 0.00 -11.66
2013-08-25 16:00:00 6.11 17.74 6.48 6.48 13.81 39.25 -11.63 -11.63
2013-08-25 17:00:00 23.64 17.53 24.94 18.46 6.11 -7.70 6.11 17.74
当我取消注释vel_acc中为函数结果设置新名称的两行时,问题就出现了。我希望列名的形式为VEL_1(关闭)、ACC_1(关闭)。。。。当调用process_Pandas_data(vel_acc,res..)时,它会返回以下错误:
RemoteTraceback Traceback (most recent call last)
RemoteTraceback:
"""
Traceback (most recent call last):
File "/home/xoel/anaconda3/envs/tensorflow/lib/python3.5/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "/home/xoel/anaconda3/envs/tensorflow/lib/python3.5/multiprocessing/pool.py", line 44, in mapstar
return list(map(*args))
File "<ipython-input-94-1a5f8ee15a6a>", line 5, in vel_acc
vel.name = "VEL_{} ({})".format(str(n), series.name)
File "/home/xoel/anaconda3/envs/tensorflow/lib/python3.5/site-packages/pandas/core/generic.py", line 3081, in __getattr__
return object.__getattribute__(self, name)
AttributeError: 'DataFrame' object has no attribute 'name'
"""
The above exception was the direct cause of the following exception:
AttributeError Traceback (most recent call last)
<ipython-input-97-5d13d64e3941> in <module>()
----> 1 process_Pandas_data(vel_acc, res.iloc[:, :4]).tail()
<ipython-input-37-6e37653b2e53> in process_Pandas_data(func, df, num_processes)
11 seq = np.array_split(df, num_partitions, axis=1)
12
---> 13 results_list = pool.map(func, seq)
14
15 return pd.concat(results_list, axis=1)
~/anaconda3/envs/tensorflow/lib/python3.5/multiprocessing/pool.py in map(self, func, iterable, chunksize)
264 in a list that is returned.
265 '''
--> 266 return self._map_async(func, iterable, mapstar, chunksize).get()
267
268 def starmap(self, func, iterable, chunksize=None):
~/anaconda3/envs/tensorflow/lib/python3.5/multiprocessing/pool.py in get(self, timeout)
642 return self._value
643 else:
--> 644 raise self._value
645
646 def _set(self, i, obj):
AttributeError: 'DataFrame' object has no attribute 'name'
那么,有没有一种方法可以将函数传递给多处理。在熊猫身上活动的水池。DataFrames为输出DataFrames设置新名称?
问题是np.array_split(df, num_processes, axis=1)
创建的是DataFrame
对象列表,而不是Series
对象列表。DataFrame
对象没有name
属性,这就是获得AttributeError
的原因。
您应该更改seq
的定义,以确保它包含Series
的实例,例如:
seq = [df[column_name] for column_name in df]
这应该避免AttributeError
。