在Python中使用多处理时,如何显示进度条(tqdm)



我有以下代码,其中create_data()引用了我之前定义的函数。

%%time
from tqdm import tqdm
from multiprocessing import Pool
import pandas as pd
import os
with Pool(processes=os.cpu_count()) as pool:
results = pool.map(create_data, date)
data = [ent for sublist in results for ent in sublist]
data = pd.DataFrame(data, columns = cols)
data.to_csv("%s"%str(date), index=False)

我基本上想调用create_data(),同时传递日期参数。然后,所获得的所有结果将被收集到results变量中。然后,我将把它们组合成一个列表,并将其转换为一个数据帧。函数create_data的计算量相当大,因此需要很长时间来计算。这就是为什么我需要进度条来查看流程。

我试着把这句话改成下面的。

results = list(tqdm(pool.map(create_od, date), total = os.cpu_count()))

但它似乎不起作用。我已经等了很长时间,没有进度条出现。我在这里该怎么办?

cf多处理。Pool.map:

它会阻塞,直到结果准备好

和tqdm.tqdm:

装饰一个可迭代对象,返回一个迭代器,它的行为与原始可迭代对象完全相同,但每次请求值时都会打印一个动态更新的进度条。

因此,在调用tqdm之前,mapping已完全完成。

我用这个代码复制:

from time import sleep
from tqdm import tqdm
from multiprocessing import Pool

def crunch(numbers):
print(numbers)
sleep(2)

if __name__ == "__main__":
with Pool(processes=4) as pool:
print("mapping ...")
results = tqdm(pool.map(crunch, range(40)), total=40)
print("done")

打印:

mapping ...
0
3
6
[...]
37
38
0%|          | 0/40 [00:00<?, ?it/s]done

相反,您应该使用lazy版本multiprocessing.Pool.imap:它将立即返回一个生成器,您必须对其进行迭代才能获得实际结果,该生成器可以封装在tqdm中。

from time import sleep
from tqdm import tqdm
from multiprocessing import Pool

def crunch(numbers):
# print(numbers)  # commented out to not mess the tqdm output
sleep(2)

if __name__ == "__main__":
with Pool(processes=4) as pool:
progress_bar = tqdm(total=40)
print("mapping ...")
results = tqdm(pool.imap(crunch, range(40)), total=40)
print("running ...")
tuple(results)  # fetch the lazy results
print("done")

打印:

mapping ...
running ...
0%|          | 0/40 [00:00<?, ?it/s]
2%|▎         | 1/40 [00:02<01:35,  2.45s/it]
12%|█▎        | 5/40 [00:04<00:27,  1.26it/s]
22%|██▎       | 9/40 [00:06<00:19,  1.58it/s]
32%|███▎      | 13/40 [00:08<00:15,  1.74it/s]
42%|████▎     | 17/40 [00:10<00:12,  1.83it/s]
52%|█████▎    | 21/40 [00:12<00:10,  1.89it/s]
62%|██████▎   | 25/40 [00:14<00:07,  1.92it/s]
72%|███████▎  | 29/40 [00:16<00:05,  1.95it/s]
82%|████████▎ | 33/40 [00:18<00:03,  1.96it/s]
100%|██████████| 40/40 [00:20<00:00,  1.95it/s]
done

(进度条在多行上,因为我在Windows终端上的PyCharm不支持r,但它在你的终端上应该可以正常工作(

更新

看看@Lenormnu的答案,他走在了正确的轨道上。

然而,imap方法的问题在于,它保证按参数的顺序返回结果。因此,如果date列表的第一个元素的处理需要非常长的时间(这是最后一个完成的任务(,进度条将不会前进,直到该任务完成。但到那时,所有其他提交的任务都已完成,进度条将立即跳到100%。诚然,这种情况不太可能发生。但如果您能按完成顺序处理结果,那就更好了。可以使用imap_unordered,但要按照任务提交的顺序恢复结果,需要首先修改create_data函数。

如果使用concurrent.futures模块中的类ProcessPoolExecutor进行多处理,实际上根本不必修改create_data函数,因为方法submit创建了Future实例,您可以将这些实例作为值为索引的键存储在字典中,然后将as_completed方法应用于字典,以按完成顺序返回已完成的任务并恢复索引。在这里,我更聪明了一点,所以不需要排序:

import tqdm
from concurrent.futures import ProcessPoolExecutor, as_completed
import pandas as pd
import os
with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
# total argument for tqdm is just the number of submitted tasks:
with tqdm.tqdm(total=len(date)) as progress_bar:
futures = {}
for idx, dt in enumerate(date):
future = executor.submit(create_data, dt)
futures[future] = idx
results = [None] * len(date) # pre_allocate slots
for future in as_completed(futures):
idx = futures[future] # order of submission
results[idx] = future.result()
progress_bar.update(1) # advance by 1
data = [ent for sublist in results for ent in sublist]
data = pd.DataFrame(data, columns = cols)

相关内容

  • 没有找到相关文章

最新更新