我有一个字典,其中每个键(日期(包含一个表(格式[day1, val11, val21], [day2, va12, val22], [day3, val13, val23], ...
的多个列表。我想将其转换为数据帧;这是使用以下代码完成的:
df4 = pd.DataFrame(columns=sorted(set_days))
for date in dic.keys():
days = [day for day, val1, val2 in dic[date]]
val1 = [val1 for day, val1, val2 in dic[date]]
df4.loc[date, days] = val1
此代码工作正常,但需要两个多小时才能运行。 经过一些研究,我意识到我可以通过multiprocessing
库并行化它;以下代码是预期的并行版本
import multiprocessing
def func(date):
global df4, dic
days = [day for day, val1, val2 in dic[date]]
val1 = [val1 for day, val1, val2 in dic[date]]
df4.loc[date, days] = val1
multiprocessing.Pool(processes=8).map(func, dic.keys())
此代码的问题在于,在执行multiprocessing.Pool(processes...
后,df4
数据帧为空。
任何帮助将不胜感激。
例
假设字典包含两天:
dic['20030812'][:4]
Out: [[1, 24.25, 0.0], [20, 23.54, 23.54], [30, 23.13, 24.36], [50, 22.85, 23.57]]
dic['20030813'][:4]
Out: [[1, 24.23, 0.0], [19, 23.4, 22.82], [30, 22.97, 24.19], [49, 22.74, 23.25]]
则数据帧的格式应为:
df4.loc[:, 1:50]
1 2 3 4 5 ... 46 47 48 49 50
20030812 24.25 NaN NaN NaN NaN ... NaN NaN NaN NaN 22.85
20030813 24.23 NaN NaN NaN NaN ... NaN NaN NaN 22.74 NaN
也
dic.keys()
Out[36]: dict_keys(['20030812', '20030813'])
df1.head().to_dict()
Out:
{1: {'20030812': 24.25, '20030813': 24.23},
2: {'20030812': nan, '20030813': nan},
3: {'20030812': nan, '20030813': nan},
4: {'20030812': nan, '20030813': nan},
5: {'20030812': nan, '20030813': nan},
6: {'20030812': nan, '20030813': nan},
7: {'20030812': nan, '20030813': nan},
8: {'20030812': nan, '20030813': nan},
9: {'20030812': nan, '20030813': nan},
10: {'20030812': nan, '20030813': nan},
11: {'20030812': nan, '20030813': nan},
12: {'20030812': nan, '20030813': nan},
13: {'20030812': nan, '20030813': nan},
14: {'20030812': nan, '20030813': nan},
15: {'20030812': nan, '20030813': nan},
16: {'20030812': nan, '20030813': nan},
17: {'20030812': nan, '20030813': nan},
18: {'20030812': nan, '20030813': nan},
19: {'20030812': nan, '20030813': 23.4},
20: {'20030812': 23.54, '20030813': nan},
21: {'20030812': nan, '20030813': nan},
22: {'20030812': nan, '20030813': nan},
23: {'20030812': nan, '20030813': nan},
24: {'20030812': nan, '20030813': nan},
25: {'20030812': nan, '20030813': nan},
26: {'20030812': nan, '20030813': nan},
27: {'20030812': nan, '20030813': nan},
28: {'20030812': nan, '20030813': nan},
29: {'20030812': nan, '20030813': nan},
30: {'20030812': 23.13, '20030813': 22.97},
31: {'20030812': nan, '20030813': nan},
32: {'20030812': nan, '20030813': nan},
...
为了回答你最初的问题(粗略地说:"为什么df4
数据帧是空的?"(,这不起作用的原因是,当Pool
工作线程启动时,每个工作线程都会继承父级数据的个人写入时复制视图(如果multiprocessing
在具有fork
的类 UNIX 系统上运行,则直接继承,或者通过笨拙的方法在 Windows 上运行时模拟它(。
因此,当每个工作人员执行以下操作时:
df4.loc[date, days] = val1
它改变了工作人员的个人df4
副本;父进程的副本保持不变。
通常,有三种方法可以解决此问题:
更改工作线程函数以返回可在父进程中使用的内容。例如,与其尝试使用
df4.loc[date, days] = val1
执行就地突变,不如返回在父级中执行此操作所需的内容,例如return date, days, val1
,然后将父项更改为:for date, days, val in multiprocessing.Pool(processes=8).map(func, dic.keys()): df4.loc[date, days] = val
这种方法的缺点是,它要求每个返回值都经过酸洗(Python 的序列化版本(,从子级管道传输到父级,并且未被腌制;如果工作线程任务没有做太多工作,特别是如果返回值很大(在这种情况下,似乎是这种情况(,它很容易在序列化和 IPC 上花费比并行性更多的时间。
使用共享对象/内存(在"多处理写入 pandas 数据帧"的回答中演示(。在实践中,这通常不会给你带来多大好处,因为不是基于使用
multiprocessing.sharedctypes
共享的更"原始"ctypes
的东西最终仍然需要将数据从一个进程传输到另一个进程; 不过,基于sharedctypes
的东西可以获得有意义的速度提升,因为一旦映射,共享原始 C 数组的访问速度几乎与本地内存一样快。如果并行化的工作是 I/O 绑定的,或者使用第三方 C 扩展进行 CPU 密集型工作(例如
numpy
(,尽管 GIL 干扰,您也许能够从线程中获得所需的速度提升,并且线程确实共享相同的内存。您的情况似乎既不受 I/O 约束,也没有有意义地依赖于可能释放 GIL 的第三方 C 扩展,因此在这里可能无济于事,但一般来说,从基于进程的并行性切换到基于线程的并行性的简单方法(当您已经在使用multiprocessing
时(是将import
从:import multiprocessing
自
import multiprocessing.dummy as multiprocessing
它将线程支持的
multiprocessing
版本导入到预期名称下,因此代码可以从使用进程无缝切换到线程。
正如RafaelC所暗示的那样,这是一个XY问题。 我已经能够在没有多处理的情况下将执行时间减少到 20 秒。
我创建了一个替换字典的列表,并且,一旦列表已满,我就将列表转换为数据帧,而不是为每个日期向 df4 数据帧添加一行。
# Returns the largest day from all the dates (each date has a different number of days)
def longest_series(dic):
largest_series = 0
for date in dic.keys():
# get the last day's table of a specific date
current_series = dic[date][-1][0]
if largest_series < current_series:
largest_series = current_series
return largest_series
ls = longest_series(dic)
l_total_days = list(range(1, ls+1))
s_total_days = set(l_total_days)
# creating lista list, lista is similar to dic
#The difference is that, in lista, every date has the same number of days
#i.e. from 1 to ls, and it does not contain the dates.
# It takes 15 seconds
lista = list()
for date in dic.keys():
present_days = list()
presen_values = list()
for day, val_252, _ in dic[date]:
present_days.append(day)
presen_values.append(val_252)
missing_days = list(s_total_days.difference(set(present_days))) # extra days added to date
missing_values = [None] * len(missing_days) # extra values added to date
all_days_index = list(np.argsort(present_days + missing_days)) # we need to preserve the order between days and values
all_day_values = presen_values + missing_values
lista.append(list(np.array(all_day_values)[all_days_index]))
# It takes 4 seconds
df = pd.DataFrame(lista, index= dic.keys(), columns=l_total_days)