出于性能原因,我将一个大数据帧划分为几个小数据帧,对每个数据帧进行迭代并进行一些计算。现在我正试图创建一个全局进度条,向我展示迄今为止我所做的所有迭代。当然,通过使用多处理,现在可以为每个单独的进程创建进度条。有没有一种方法可以更新";"总体";进步不幸的是,我无法在其他论坛帖子中找到答案,因为我不想看到每个单独子流程的进度或完成了多少流程,但在";do_计算";函数。我的代码是:
import multiprocessing as mp
from tqdm import tqdm
import pandas as pd
# load the initial dataframe
initial_df = pd.read_csv(r"...") # let's assume len(initial_df)=60
# create a "global" progress bar
pbar = tqdm(total=len(initial_df))
def do_calculations(sub_df):
"""Function that calculates some things for each row of a sub_dataframe."""
# iterate through the sub_dataframe
for index, row in sub_df.iterrows():
# do some calculations
# here i want to update the "global" progress bar for all parallel
# progresses
global pbar
pbar.update(1)
return sub_df
def execute():
"""Function that executes the 'do_calculations' function using multiprocessing."""
num_processes = mp.cpu_count() - 2 # let's assume num_processes=6
pool = mp.Pool(processes=num_processes)
# split the initial dataframe
divided_df = np.array_split(initial_df, num_processes)
# execute the 'do_calculations' function using multiprocessing and re-joining the
#dataframe
new_df = pd.concat(pool.map(do_calculations, divided_df))
pool.close()
pool.join()
return new_df
if __name__ == "__main__":
new_data = execute()
到目前为止,我的结果是:
17%|█▋ | 10/60 [00:05<00:25, 1.99it/s]
17%|█▋ | 10/60 [00:05<00:25, 1.97it/s]
17%|█▋ | 10/60 [00:05<00:25, 1.98it/s]
17%|█▋ | 10/60 [00:05<00:25, 1.96it/s]
17%|█▋ | 10/60 [00:05<00:25, 1.98it/s]
17%|█▋ | 10/60 [00:05<00:25, 1.98it/s]
0%| | 0/60 [00:08<?, ?it/s]
我想要的结果是do_calculations函数中完成的迭代次数:
100%|██████████| 60/60 [00:13<00:00, 4.42it/s]
我不是在哪一步"地图"功能是:
100%|██████████| 6/6 [00:04<00:00, 1.28it/s]
我很感激你的帮助!!提前谢谢。
以下是将tqdm
与multiprocessing.pool.imap
一起使用的示例(来源(:
import multiprocessing as mp
import numpy as np
import pandas as pd
from tqdm import tqdm
def do_calculations(sub_df):
"""Function that calculates some things for each row of a sub_dataframe."""
# iterate through the sub_dataframe
for index, row in sub_df.iterrows():
# do some calculations
pass
return sub_df
def execute():
"""Function that executes the 'do_calculations' function using multiprocessing."""
num_processes = mp.cpu_count() - 2
# Split the initial dataframe
# Create 4 times more divided dataframes than processes being used to show progress.
divided_df = np.array_split(initial_df, num_processes * 4)
with mp.Pool(processes=num_processes) as pool:
# Inspiration: https://stackoverflow.com/a/45276885/4856719
results = list(tqdm(pool.imap(do_calculations, divided_df), total=len(divided_df)))
new_df = pd.concat(results, axis=0, ignore_index=True)
return new_df
if __name__ == "__main__":
# load the initial dataframe (replaced)
initial_df = pd.DataFrame(np.random.randint(0, 100, size=(10_000_000, 4)), columns=list('ABCD'))
new_data = execute()
输出(使用20个线程中的18个(:
100%|██████████| 72/72 [00:35<00:00, 2.05it/s]
只有当创建的数据帧拆分多于使用的进程时,此解决方案才有用。否则(当所有进程花费相同的时间时(;移动";一次(在最后(。