使用多进程顺序运行进程而不是同时运行进程的并行化



我试图使用多处理模块并行化下面给出的一段代码。我尝试的所有方法都会导致每个子进程一个接一个地运行,即使它们都具有不同的pid。我试过了:

  1. CentOS和MacOS
  2. 作为spawn和fork的上下文
  3. 使用队列和使用池
  4. Using Apply and Using map及其异步版本
  5. 添加/删除pool.join()和Process.join()

我不知道我做错了什么。

fs.py:

import numpy as np
from time import sleep
import os
def f(r):
res = np.arange(r[0], r[1])
print(f'I am {os.getpid()}')
sleep(10)
print(f'I am {os.getpid()} and I am finished')
return {'nums': res, 'dubs': res * 2}

playground.py:

import multiprocessing as mp
import numpy as np
from fs import f

if __name__ == '__main__':
ctx = mp.get_context('spawn')
p = ctx.Pool(4)
with p:
subsets = [[0, 3], [3, 6], [6, 7]]
res = [p.apply(f, (subset, )) for subset in subsets]
print(res)
print('Done!')

命令:python playground.py

输出:

I am 29881
I am 29881 and I am finished
I am 29882
I am 29882 and I am finished
I am 29881
I am 29881 and I am finished
[{'nums': array([0, 1, 2]), 'dubs': array([0, 2, 4])}, {'nums': array([3, 4, 5]), 
'dubs': array([ 6,  8, 10])}, {'nums': array([6]), 'dubs': array([12])}]
Done!

当我像这样使用p.map()(在Linux Mint上)

res = p.map(f, subsets)

则得到

I am 1337328
I am 1337325
I am 1337327
I am 1337328 and I am finished
I am 1337325 and I am finished
I am 1337327 and I am finished

也许你用错了map()res = [p.map(f, (subset, )) for subset in subsets]


if __name__ == '__main__':
ctx = mp.get_context('spawn')
with ctx.Pool(4) as p:
subsets = [[0, 3], [3, 6], [6, 7]]
res = p.map(f, subsets)
print(res)

print('Done!')

对于apply_async,您需要两个for循环

items = [p.apply_async(f, (subset, )) for subset in subsets]
res = [x.get() for x in items]
print(res)

都必须在with p:


if __name__ == '__main__':
ctx = mp.get_context('spawn')
with ctx.Pool(4) as p:
subsets = [[0, 3], [3, 6], [6, 7]]
items = [p.apply_async(f, (subset, )) for subset in subsets]
print(items)

res = [x.get() for x in items]
print(res)

print('Done!')

每个子进程一个接一个运行,因为Pool.apply()阻塞直到结果准备好——有效地防止并行处理的发生。

使用Pool.map_async()可以防止这种情况。注意,我还在f()函数变量中设置了延迟,以使处理时间发生变化。

playground.py

import multiprocessing as mp
import numpy as np
from pprint import pprint
from fs import f

if __name__ == '__main__':
ctx = mp.get_context('spawn')
pool = ctx.Pool(4)
with pool:
subsets = [[0, 3], [3, 6], [6, 7]]
res = pool.map_async(f, subsets).get(timeout=10)
pprint(res, sort_dicts=False)
print('Done!')

fs.py

import numpy as np
import os
import random
from time import sleep
def f(r):
print(f'f({r}) called')
res = np.arange(r[0], r[1])
print(f'I am {os.getpid()}')
sleep(random.uniform(0, 2))  # Random time delay.
print(f'I am {os.getpid()} and I am finished')
return {'nums': res, 'dubs': res * 2}

结果:

f([0, 3]) called
I am 2120
f([3, 6]) called
I am 32208
f([6, 7]) called
I am 13884
I am 2120 and I am finished
I am 13884 and I am finished
I am 32208 and I am finished
[{'nums': array([0, 1, 2]), 'dubs': array([0, 2, 4])},
{'nums': array([3, 4, 5]), 'dubs': array([ 6,  8, 10])},
{'nums': array([6]), 'dubs': array([12])}]