在 python 中并行化 for 循环



我有一个字典,其中每个键(日期(包含一个表(格式[day1, val11, val21], [day2, va12, val22], [day3, val13, val23], ...的多个列表。我想将其转换为数据帧;这是使用以下代码完成的:

df4 = pd.DataFrame(columns=sorted(set_days))
for date in dic.keys():
days = [day  for day, val1, val2  in dic[date]]
val1 = [val1 for day, val1, val2  in dic[date]]
df4.loc[date, days] = val1

此代码工作正常,但需要两个多小时才能运行。 经过一些研究,我意识到我可以通过multiprocessing库并行化它;以下代码是预期的并行版本

import multiprocessing
def func(date):
global df4, dic
days = [day  for day, val1, val2  in dic[date]]
val1 = [val1 for day, val1, val2  in dic[date]]
df4.loc[date, days] = val1
multiprocessing.Pool(processes=8).map(func, dic.keys())

此代码的问题在于,在执行multiprocessing.Pool(processes...后,df4数据帧为空。

任何帮助将不胜感激。

假设字典包含两天:

dic['20030812'][:4]
Out: [[1, 24.25, 0.0], [20, 23.54, 23.54], [30, 23.13, 24.36], [50, 22.85, 23.57]]
dic['20030813'][:4]
Out: [[1, 24.23, 0.0], [19, 23.4, 22.82], [30, 22.97, 24.19], [49, 22.74, 23.25]]

则数据帧的格式应为:

df4.loc[:, 1:50]
1    2    3    4    5   ...     46   47   48     49     50
20030812  24.25  NaN  NaN  NaN  NaN  ...    NaN  NaN  NaN    NaN  22.85
20030813  24.23  NaN  NaN  NaN  NaN  ...    NaN  NaN  NaN  22.74    NaN

dic.keys()
Out[36]: dict_keys(['20030812', '20030813'])
df1.head().to_dict()
Out: 
{1: {'20030812': 24.25, '20030813': 24.23},
2: {'20030812': nan, '20030813': nan},
3: {'20030812': nan, '20030813': nan},
4: {'20030812': nan, '20030813': nan},
5: {'20030812': nan, '20030813': nan},
6: {'20030812': nan, '20030813': nan},
7: {'20030812': nan, '20030813': nan},
8: {'20030812': nan, '20030813': nan},
9: {'20030812': nan, '20030813': nan},
10: {'20030812': nan, '20030813': nan},
11: {'20030812': nan, '20030813': nan},
12: {'20030812': nan, '20030813': nan},
13: {'20030812': nan, '20030813': nan},
14: {'20030812': nan, '20030813': nan},
15: {'20030812': nan, '20030813': nan},
16: {'20030812': nan, '20030813': nan},
17: {'20030812': nan, '20030813': nan},
18: {'20030812': nan, '20030813': nan},
19: {'20030812': nan, '20030813': 23.4},
20: {'20030812': 23.54, '20030813': nan},
21: {'20030812': nan, '20030813': nan},
22: {'20030812': nan, '20030813': nan},
23: {'20030812': nan, '20030813': nan},
24: {'20030812': nan, '20030813': nan},
25: {'20030812': nan, '20030813': nan},
26: {'20030812': nan, '20030813': nan},
27: {'20030812': nan, '20030813': nan},
28: {'20030812': nan, '20030813': nan},
29: {'20030812': nan, '20030813': nan},
30: {'20030812': 23.13, '20030813': 22.97},
31: {'20030812': nan, '20030813': nan},
32: {'20030812': nan, '20030813': nan},
...

为了回答你最初的问题(粗略地说:"为什么df4数据帧是空的?"(,这不起作用的原因是,当Pool工作线程启动时,每个工作线程都会继承父级数据的个人写入时复制视图(如果multiprocessing在具有fork的类 UNIX 系统上运行,则直接继承,或者通过笨拙的方法在 Windows 上运行时模拟它(。

因此,当每个工作人员执行以下操作时:

df4.loc[date, days] = val1

它改变了工作人员的个人df4副本;父进程的副本保持不变。

通常,有三种方法可以解决此问题:

  1. 更改工作线程函数以返回可在父进程中使用的内容。例如,与其尝试使用df4.loc[date, days] = val1执行就地突变,不如返回在父级中执行此操作所需的内容,例如return date, days, val1,然后将父项更改为:

    for date, days, val in multiprocessing.Pool(processes=8).map(func, dic.keys()):
    df4.loc[date, days] = val
    

    这种方法的缺点是,它要求每个返回值都经过酸洗(Python 的序列化版本(,从子级管道传输到父级,并且未被腌制;如果工作线程任务没有做太多工作,特别是如果返回值很大(在这种情况下,似乎是这种情况(,它很容易在序列化和 IPC 上花费比并行性更多的时间。

  2. 使用共享对象/内存(在"多处理写入 pandas 数据帧"的回答中演示(。在实践中,这通常不会给你带来多大好处,因为不是基于使用multiprocessing.sharedctypes共享的更"原始"ctypes的东西最终仍然需要将数据从一个进程传输到另一个进程; 不过,基于sharedctypes的东西可以获得有意义的速度提升,因为一旦映射,共享原始 C 数组的访问速度几乎与本地内存一样快。

  3. 如果并行化的工作是 I/O 绑定的,或者使用第三方 C 扩展进行 CPU 密集型工作(例如numpy(,尽管 GIL 干扰,您也许能够从线程中获得所需的速度提升,并且线程确实共享相同的内存。您的情况似乎既不受 I/O 约束,也没有有意义地依赖于可能释放 GIL 的第三方 C 扩展,因此在这里可能无济于事,但一般来说,从基于进程的并行性切换到基于线程的并行性的简单方法(当您已经在使用multiprocessing时(是将import从:

    import multiprocessing
    

    import multiprocessing.dummy as multiprocessing
    

    它将线程支持的multiprocessing版本导入到预期名称下,因此代码可以从使用进程无缝切换到线程。

正如RafaelC所暗示的那样,这是一个XY问题。 我已经能够在没有多处理的情况下将执行时间减少到 20 秒。

我创建了一个替换字典的列表,并且,一旦列表已满,我就将列表转换为数据帧,而不是为每个日期向 df4 数据帧添加一行。

# Returns the largest day from  all the dates (each date has a different number of days)
def longest_series(dic):
largest_series = 0
for date in dic.keys():
# get the last day's table of a specific date
current_series = dic[date][-1][0]
if largest_series < current_series:
largest_series = current_series
return largest_series

ls = longest_series(dic)
l_total_days = list(range(1, ls+1))
s_total_days = set(l_total_days)
# creating lista list, lista is similar to dic 
#The difference is that, in lista, every date has the same number of days 
#i.e. from 1 to ls, and it does not contain the dates.
# It takes 15 seconds
lista = list()
for date in dic.keys():
present_days = list()
presen_values = list()
for day, val_252, _ in dic[date]:
present_days.append(day)
presen_values.append(val_252)
missing_days = list(s_total_days.difference(set(present_days))) # extra days added to date
missing_values = [None] * len(missing_days)                     # extra values added to date
all_days_index = list(np.argsort(present_days + missing_days))  # we need to preserve the order between days and values
all_day_values = presen_values + missing_values  
lista.append(list(np.array(all_day_values)[all_days_index]))

# It takes 4 seconds
df = pd.DataFrame(lista, index= dic.keys(), columns=l_total_days)

相关内容

  • 没有找到相关文章

最新更新