我在Python中使用多处理池及其.apply_async()
方法以并发方式运行多个worker。
但是由于使用with
而不是任意创建实例而存在问题。
这是我到目前为止所做的:
常见部分代码片段:
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor
from time import sleep, time
def worker(x):
print(f"{x} started.")
sleep(x)
print(f"{x} finished.")
return f"{x} finished."
result_list = []
def log_result(result):
result_list.append(result)
第一个通过Python 2方式运行良好的代码片段:
tick = time()
pool = Pool()
for i in range(6):
pool.apply_async(worker, args=(i, ), callback=log_result)
pool.close()
pool.join()
print('Total elapsed time: ', time() - tick)
print(result_list)
print(i) # Indicates that all iteration has been done.
外:
1 started.
2 started.
0 started.
0 finished.
3 started.
4 started.
1 finished.
5 started.
2 finished.
3 finished.
4 finished.
5 finished.
Total elapsed time: 6.022687673568726
['0 finished.', '1 finished.', '2 finished.', '3 finished.', '4 finished.', '5 finished.']
5
第二个代码片段,通过Python3方式运行良好:
tick = time()
with ProcessPoolExecutor() as executor:
for i in range(6):
executor.submit(worker, i)
print('Total elapsed time: ', time() - tick)
print(i) # Indicates that all iteration has been done.
外:
0 started.
0 finished.
1 started.
2 started.
3 started.
4 started.
1 finished.
5 started.
2 finished.
3 finished.
4 finished.
5 finished.
Total elapsed time: 6.017550945281982
5
额外:
可以推断 Python- 3的方式比Python 2方法更快。
问题:
现在问题就在这里,我想使用Python2等方法实现Python 3with
方式,但任务未完成:
tick = time()
with Pool() as pool:
for i in range(6):
pool.apply_async(worker, args=(i,), callback=log_result)
print('Total elapsed time: ', time() - tick)
print(result_list)
print(i) # Indicates that all iteration has been done.
外:
Total elapsed time: 0.10628008842468262
[]
5
但是,如果我在pool.apply_async(...)
之后放置一个sleep(1)
,一些精简任务将完成(建立一个块(:
tick = time()
with Pool() as pool:
for i in range(6):
pool.apply_async(worker, args=(i,), callback=log_result)
sleep(1)
print('Total elapsed time: ', time() - tick)
print(result_list)
print(i) # Indicates that all iteration has been done.
外:
0 started.
0 finished.
1 started.
2 started.
1 finished.
3 started.
4 started.
2 finished.
5 started.
3 finished.
Total elapsed time: 6.022568702697754
['0 finished.', '1 finished.', '2 finished.', '3 finished.']
5
我错过了什么?
concurrent.futures.Executor
和multiprocessing.Pool
有两个完全不同的上下文管理器实现。
根据文档,concurrent.futures.Executor
调用shutdown(wait=True)
有效地等待所有排队的作业完成。
如果使用 with 语句,则可以避免显式调用此方法,这将关闭执行器(等待,就像调用 Executor.shutdown(( 时将 True 设置为 True 一样(
multiprocessing.Pool
调用terminate
而不是close
然后join
这会导致所有正在进行的工作过早中断。在文档中。
池对象现在支持上下文管理协议 – 请参阅上下文管理器类型。enter(( 返回池对象,exit(( 调用 terminate((。
如果要将multiprocessing.Pool
与其上下文管理器一起使用,则需要自己等待结果。
with Pool() as pool:
async_result = pool.apply_async(worker, args=(i,), callback=log_result)
async_result.wait()