我有一个函数,其中有4个嵌套的for循环。该函数接收一个数据帧并返回一个新的数据帧。目前该功能运行大约需要2个小时,我需要它在大约30分钟内运行。。。
我尝试过使用4核进行多处理,但似乎无法正常工作。我首先创建一个输入数据帧的列表,该列表被分割成更小的块(list_of_df(
all_trips = uncov_df.TRIP_NO.unique()
list_of_df = []
for trip in all_trips:
list_of_df.append(uncov_df[uncov_df.TRIP_NO==trip])
然后,我尝试使用4个池将这个块列表映射到我的函数(transform_df(中。
from multiprocessing import Pool
if __name__ == "__main__":
with Pool(4) as p:
df_uncov = list(p.map(transform_df, list_of_df))
df = pd.concat(df_uncov)
当我运行上面的代码时,我的代码单元冻结,什么也没发生。有人知道发生了什么事吗?
这就是我使用starmap设置的方法。这将返回稍后要连接的dfs列表。
#put this above if __name__ == "__main__":
def get_dflist_multiprocess(keys_list, num_proc=4):
with Pool(num_proc) as p:
df_list = p.starmap(transform_df, list_of_df)
p.close()
p.join()
return df_list
#then below if __name__ == "__main__":
df_list = get_dflist_multiprocess(list_of_df, num_proc=4) #collect dataframes for each file
df_new = pd.concat(df_list, sort=False)