我有以下代码,其中create_data()
引用了我之前定义的函数。
%%time
from tqdm import tqdm
from multiprocessing import Pool
import pandas as pd
import os
with Pool(processes=os.cpu_count()) as pool:
results = pool.map(create_data, date)
data = [ent for sublist in results for ent in sublist]
data = pd.DataFrame(data, columns = cols)
data.to_csv("%s"%str(date), index=False)
我基本上想调用create_data()
,同时传递日期参数。然后,所获得的所有结果将被收集到results
变量中。然后,我将把它们组合成一个列表,并将其转换为一个数据帧。函数create_data
的计算量相当大,因此需要很长时间来计算。这就是为什么我需要进度条来查看流程。
我试着把这句话改成下面的。
results = list(tqdm(pool.map(create_od, date), total = os.cpu_count()))
但它似乎不起作用。我已经等了很长时间,没有进度条出现。我在这里该怎么办?
cf多处理。Pool.map:
它会阻塞,直到结果准备好
和tqdm.tqdm:
装饰一个可迭代对象,返回一个迭代器,它的行为与原始可迭代对象完全相同,但每次请求值时都会打印一个动态更新的进度条。
因此,在调用tqdm
之前,map
ping已完全完成。
我用这个代码复制:
from time import sleep
from tqdm import tqdm
from multiprocessing import Pool
def crunch(numbers):
print(numbers)
sleep(2)
if __name__ == "__main__":
with Pool(processes=4) as pool:
print("mapping ...")
results = tqdm(pool.map(crunch, range(40)), total=40)
print("done")
打印:
mapping ...
0
3
6
[...]
37
38
0%| | 0/40 [00:00<?, ?it/s]done
相反,您应该使用lazy版本multiprocessing.Pool.imap
:它将立即返回一个生成器,您必须对其进行迭代才能获得实际结果,该生成器可以封装在tqdm
中。
from time import sleep
from tqdm import tqdm
from multiprocessing import Pool
def crunch(numbers):
# print(numbers) # commented out to not mess the tqdm output
sleep(2)
if __name__ == "__main__":
with Pool(processes=4) as pool:
progress_bar = tqdm(total=40)
print("mapping ...")
results = tqdm(pool.imap(crunch, range(40)), total=40)
print("running ...")
tuple(results) # fetch the lazy results
print("done")
打印:
mapping ...
running ...
0%| | 0/40 [00:00<?, ?it/s]
2%|▎ | 1/40 [00:02<01:35, 2.45s/it]
12%|█▎ | 5/40 [00:04<00:27, 1.26it/s]
22%|██▎ | 9/40 [00:06<00:19, 1.58it/s]
32%|███▎ | 13/40 [00:08<00:15, 1.74it/s]
42%|████▎ | 17/40 [00:10<00:12, 1.83it/s]
52%|█████▎ | 21/40 [00:12<00:10, 1.89it/s]
62%|██████▎ | 25/40 [00:14<00:07, 1.92it/s]
72%|███████▎ | 29/40 [00:16<00:05, 1.95it/s]
82%|████████▎ | 33/40 [00:18<00:03, 1.96it/s]
100%|██████████| 40/40 [00:20<00:00, 1.95it/s]
done
(进度条在多行上,因为我在Windows终端上的PyCharm不支持r
,但它在你的终端上应该可以正常工作(
更新
看看@Lenormnu的答案,他走在了正确的轨道上。
然而,imap
方法的问题在于,它保证按参数的顺序返回结果。因此,如果date
列表的第一个元素的处理需要非常长的时间(这是最后一个完成的任务(,进度条将不会前进,直到该任务完成。但到那时,所有其他提交的任务都已完成,进度条将立即跳到100%。诚然,这种情况不太可能发生。但如果您能按完成顺序处理结果,那就更好了。可以使用imap_unordered
,但要按照任务提交的顺序恢复结果,需要首先修改create_data
函数。
如果使用concurrent.futures
模块中的类ProcessPoolExecutor
进行多处理,实际上根本不必修改create_data
函数,因为方法submit
创建了Future
实例,您可以将这些实例作为值为索引的键存储在字典中,然后将as_completed
方法应用于字典,以按完成顺序返回已完成的任务并恢复索引。在这里,我更聪明了一点,所以不需要排序:
import tqdm
from concurrent.futures import ProcessPoolExecutor, as_completed
import pandas as pd
import os
with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
# total argument for tqdm is just the number of submitted tasks:
with tqdm.tqdm(total=len(date)) as progress_bar:
futures = {}
for idx, dt in enumerate(date):
future = executor.submit(create_data, dt)
futures[future] = idx
results = [None] * len(date) # pre_allocate slots
for future in as_completed(futures):
idx = futures[future] # order of submission
results[idx] = future.result()
progress_bar.update(1) # advance by 1
data = [ent for sublist in results for ent in sublist]
data = pd.DataFrame(data, columns = cols)