在拆分的pandas DataFrame上使用多处理和迭代更新全局tqdm进度条



出于性能原因,我将一个大数据帧划分为几个小数据帧,对每个数据帧进行迭代并进行一些计算。现在我正试图创建一个全局进度条,向我展示迄今为止我所做的所有迭代。当然,通过使用多处理,现在可以为每个单独的进程创建进度条。有没有一种方法可以更新";"总体";进步不幸的是,我无法在其他论坛帖子中找到答案,因为我不想看到每个单独子流程的进度或完成了多少流程,但在";do_计算";函数。我的代码是:

import multiprocessing as mp
from tqdm import tqdm
import pandas as pd
# load the initial dataframe
initial_df = pd.read_csv(r"...") # let's assume len(initial_df)=60
# create a "global" progress bar
pbar = tqdm(total=len(initial_df))
def do_calculations(sub_df):
"""Function that calculates some things for each row of a sub_dataframe."""
# iterate through the sub_dataframe
for index, row in sub_df.iterrows():
# do some calculations

# here i want to update the "global" progress bar for all parallel
# progresses
global pbar
pbar.update(1)
return sub_df

def execute():
"""Function that executes the 'do_calculations' function using multiprocessing."""

num_processes = mp.cpu_count() - 2  # let's assume num_processes=6
pool = mp.Pool(processes=num_processes)
# split the initial dataframe
divided_df = np.array_split(initial_df, num_processes)
# execute the 'do_calculations' function using multiprocessing and re-joining the 
#dataframe
new_df = pd.concat(pool.map(do_calculations, divided_df))
pool.close()
pool.join()
return new_df

if __name__ == "__main__":
new_data = execute()

到目前为止,我的结果是:

17%|█▋        | 10/60 [00:05<00:25,  1.99it/s]
17%|█▋        | 10/60 [00:05<00:25,  1.97it/s]
17%|█▋        | 10/60 [00:05<00:25,  1.98it/s]
17%|█▋        | 10/60 [00:05<00:25,  1.96it/s]
17%|█▋        | 10/60 [00:05<00:25,  1.98it/s]
17%|█▋        | 10/60 [00:05<00:25,  1.98it/s]
0%|          | 0/60 [00:08<?, ?it/s]

我想要的结果是do_calculations函数中完成的迭代次数:

100%|██████████| 60/60 [00:13<00:00,  4.42it/s]

不是在哪一步"地图"功能是:

100%|██████████| 6/6 [00:04<00:00,  1.28it/s]

我很感激你的帮助!!提前谢谢。

以下是将tqdmmultiprocessing.pool.imap一起使用的示例(来源(:

import multiprocessing as mp
import numpy as np
import pandas as pd
from tqdm import tqdm

def do_calculations(sub_df):
"""Function that calculates some things for each row of a sub_dataframe."""
# iterate through the sub_dataframe
for index, row in sub_df.iterrows():
# do some calculations
pass
return sub_df

def execute():
"""Function that executes the 'do_calculations' function using multiprocessing."""
num_processes = mp.cpu_count() - 2
# Split the initial dataframe
# Create 4 times more divided dataframes than processes being used to show progress.
divided_df = np.array_split(initial_df, num_processes * 4)
with mp.Pool(processes=num_processes) as pool:
# Inspiration: https://stackoverflow.com/a/45276885/4856719
results = list(tqdm(pool.imap(do_calculations, divided_df), total=len(divided_df)))
new_df = pd.concat(results, axis=0, ignore_index=True)
return new_df

if __name__ == "__main__":
# load the initial dataframe (replaced)
initial_df = pd.DataFrame(np.random.randint(0, 100, size=(10_000_000, 4)), columns=list('ABCD'))
new_data = execute()

输出(使用20个线程中的18个(:

100%|██████████| 72/72 [00:35<00:00,  2.05it/s]

只有当创建的数据帧拆分多于使用的进程时,此解决方案才有用。否则(当所有进程花费相同的时间时(;移动";一次(在最后(。

最新更新