在多进程代码中,返回结果的速度非常慢.我该怎么办



无多处理代码:

from time import time
func1Results = []
def func1(valList):
num = 0
for val in valList:
num += val
func1Results.append(num)
if __name__ == '__main__':
st = time()

for valList in [range(40000000), range(40000000), range(40000000), range(40000000)]:
func1(valList)
ed = time()

for r in func1Results:
print(r)
print(ed - st)

输出:
799999980000000
79999980000000
7999999 80000000
13.679918918526001

多进程代码:

from multiprocessing import Process, Queue
from time import time
queue = Queue()
processList, func1Results = [], []
def func1(valList, queue):
num = 0
for val in valList:
num += val
queue.put(num)
if __name__ == '__main__':
st = time()
for valList in [range(40000000), range(40000000), range(40000000), range(40000000)]:
xProcess = Process(target=func1, args=(valList, queue))
xProcess.start()

func1Results.append(queue.get()), processList.append(xProcess)
for xProcess in processList:
xProcess.join()
ed = time()
for i in func1Results:
print(i)
print(ed - st)

输出:
799999980000000
79999980000000
7999999 80000000
13916456937789917

当我使用"Put"one_answers"Get"命令时,多处理代码的处理时间会显著增加。我知道在多处理中返回结果相当耗时。但这正是我所需要的。我可以做些什么来更有效地返回结果?

这是对原始代码的一种重构方法,我们允许所有子进程在检查队列之前终止。

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager
from functools import partial
import time
N = 40000000
def calc(q, rng):
num = 0
for n in rng:
num += n
q.put(num)
def main():
with Manager() as manager:
queue = manager.Queue()
rlist = [range(N), range(N), range(N), range(N)]
p = partial(calc, queue)
with ProcessPoolExecutor() as executor:
executor.map(p, rlist)
while not queue.empty():
print(queue.get())
if __name__ == '__main__':
start = time.perf_counter()
main()
end = time.perf_counter()
print(f'Duration = {end-start:.2f}s')

输出:

799999980000000
799999980000000
799999980000000
799999980000000
Duration = 1.93s

注意:

当然,您不需要一个队列来获取子流程的结果。你可以这样做:

from concurrent.futures import ProcessPoolExecutor
import time
N = 40000000
def calc(rng):
num = 0
for n in rng:
num += n
return num
def main():
rlist = [range(N), range(N), range(N), range(N)]
with ProcessPoolExecutor() as executor:
print(*executor.map(calc, rlist), sep='n')

if __name__ == '__main__':
start = time.perf_counter()
main()
end = time.perf_counter()
print(f'Duration = {end-start:.2f}s')

输出:

799999980000000
799999980000000
799999980000000
799999980000000
Duration = 1.83s

最新更新