我试图使用多处理模块并行化下面给出的一段代码。我尝试的所有方法都会导致每个子进程一个接一个地运行,即使它们都具有不同的pid。我试过了:
- CentOS和MacOS
- 作为spawn和fork的上下文
- 使用队列和使用池
- Using Apply and Using map及其异步版本
- 添加/删除pool.join()和Process.join()
我不知道我做错了什么。
fs.py:
import numpy as np
from time import sleep
import os
def f(r):
res = np.arange(r[0], r[1])
print(f'I am {os.getpid()}')
sleep(10)
print(f'I am {os.getpid()} and I am finished')
return {'nums': res, 'dubs': res * 2}
playground.py:
import multiprocessing as mp
import numpy as np
from fs import f
if __name__ == '__main__':
ctx = mp.get_context('spawn')
p = ctx.Pool(4)
with p:
subsets = [[0, 3], [3, 6], [6, 7]]
res = [p.apply(f, (subset, )) for subset in subsets]
print(res)
print('Done!')
命令:python playground.py
输出:
I am 29881
I am 29881 and I am finished
I am 29882
I am 29882 and I am finished
I am 29881
I am 29881 and I am finished
[{'nums': array([0, 1, 2]), 'dubs': array([0, 2, 4])}, {'nums': array([3, 4, 5]),
'dubs': array([ 6, 8, 10])}, {'nums': array([6]), 'dubs': array([12])}]
Done!
当我像这样使用p.map()
(在Linux Mint上)
res = p.map(f, subsets)
则得到
I am 1337328
I am 1337325
I am 1337327
I am 1337328 and I am finished
I am 1337325 and I am finished
I am 1337327 and I am finished
也许你用错了map()
。res = [p.map(f, (subset, )) for subset in subsets]
if __name__ == '__main__':
ctx = mp.get_context('spawn')
with ctx.Pool(4) as p:
subsets = [[0, 3], [3, 6], [6, 7]]
res = p.map(f, subsets)
print(res)
print('Done!')
对于apply_async
,您需要两个for
循环
items = [p.apply_async(f, (subset, )) for subset in subsets]
res = [x.get() for x in items]
print(res)
都必须在with p:
if __name__ == '__main__':
ctx = mp.get_context('spawn')
with ctx.Pool(4) as p:
subsets = [[0, 3], [3, 6], [6, 7]]
items = [p.apply_async(f, (subset, )) for subset in subsets]
print(items)
res = [x.get() for x in items]
print(res)
print('Done!')
每个子进程一个接一个运行,因为Pool.apply()
阻塞直到结果准备好——有效地防止并行处理的发生。
使用Pool.map_async()
可以防止这种情况。注意,我还在f()
函数变量中设置了延迟,以使处理时间发生变化。
playground.py
import multiprocessing as mp
import numpy as np
from pprint import pprint
from fs import f
if __name__ == '__main__':
ctx = mp.get_context('spawn')
pool = ctx.Pool(4)
with pool:
subsets = [[0, 3], [3, 6], [6, 7]]
res = pool.map_async(f, subsets).get(timeout=10)
pprint(res, sort_dicts=False)
print('Done!')
fs.py
import numpy as np
import os
import random
from time import sleep
def f(r):
print(f'f({r}) called')
res = np.arange(r[0], r[1])
print(f'I am {os.getpid()}')
sleep(random.uniform(0, 2)) # Random time delay.
print(f'I am {os.getpid()} and I am finished')
return {'nums': res, 'dubs': res * 2}
结果:
f([0, 3]) called
I am 2120
f([3, 6]) called
I am 32208
f([6, 7]) called
I am 13884
I am 2120 and I am finished
I am 13884 and I am finished
I am 32208 and I am finished
[{'nums': array([0, 1, 2]), 'dubs': array([0, 2, 4])},
{'nums': array([3, 4, 5]), 'dubs': array([ 6, 8, 10])},
{'nums': array([6]), 'dubs': array([12])}]