如何提高Python中并行循环的效率



我很好奇Python中的并行循环与Matlab中的parloop相比效率有多低。在这里,我提出了一个简单的寻根问题,在ab之间强行进行10^6的初始猜测。

import numpy as np
from scipy.optimize import root
import matplotlib.pyplot as plt
import multiprocessing
# define the function to find the roots
func = lambda x: np.sin(3*np.pi*np.cos(np.pi*x)*np.sin(np.pi*x))
def forfunc(x0):
q = [root(func, xi).x for xi in x0]
q = np.array(q).T[0]
return q
# variables os the problem
a = -3
b = 5
n = int(1e6)
x0 = np.linspace(a,b,n) # list of initial guesses
# the single-process loop
q = forfunc(x0)
# parallel loop
nc = 4
pool = multiprocessing.Pool(processes=nc)
q = np.hstack(pool.map(forfunc,np.split(x0,nc)))
pool.close()

单个工艺回路的壁面时间为1min 26s,并联回路的壁面的时间为1min 7s。我看到了一些改进,因为加速是1.28,但在这种情况下,效率(timeloop/timeparallel/n_process)是0.32。

这里发生了什么?如何提高效率?我做错什么了吗?

我还尝试过以两种方式使用dask.delayed

import dask
# Every call is a delayed object
q = dask.compute(*[dask.delayed(func)(xi) for xi in x0])
# Every chunk is a delayed object
q = dask.compute(*[dask.delayed(forfunc)(x0i) for x0i in np.split(x0,nc)])

在这里,两者都比单个过程循环花费更多的时间。第一次尝试的壁面时间为3分钟,第二次尝试的时间为1分27秒。

Dask(或Spark(发生了什么

通过单进程测试,您的循环在90秒内执行了一百万个任务。因此,在平均情况下,每个任务占用CPU大约90微秒。

在像Dask或Spark这样提供灵活性和弹性的分布式计算框架中,任务的开销很小。Dask的开销低至每个任务200微秒。Spark 3.0文档表明,Spark可以支持短至200毫秒的任务,这可能意味着Dask的开销实际上比Spark少1000倍。听起来Dask在这里真的做得很好!

如果您的任务比框架的每任务开销快,那么与在相同数量的机器/核心上手动分配工作相比,使用它的性能会更差。在这种情况下,您会遇到这种情况。

在您的分块数据Dask示例中,您只有几个任务,因此您可以从减少的开销中看到更好的性能。但是,与原始多处理相比,Dask的开销可能会对性能造成较小的影响,或者您没有使用Dask集群并在单个进程中运行任务。

多处理器(和Dask(应该有帮助

对于这种令人尴尬的并行问题,多处理的结果通常是出乎意料的。您可能需要确认机器上的物理内核数量,特别是确保没有其他东西在积极利用您的CPU内核。在不知道其他事情的情况下,我想这就是罪魁祸首。

在我有两个物理核心的笔记本电脑上,你的例子是:

  • 单个工艺循环的2min 1s
  • 两道工序1min 2s
  • 四道工序1min
  • 使用nc=2将一个分块的Dask示例拆分为两个块和一个由两个工人和每个工人一个线程组成的LocalCluster,时间为1min 5s。可能值得仔细检查一下您是否在集群上运行

用两个进程获得大约2倍的加速符合我笔记本电脑的预期,因为对于这个CPU受限的任务,看到更多进程带来的好处微乎其微或根本没有。相对于原始多处理,Dask还增加了一些开销。

%%time
​
# the single-process loop
q = forfunc(x0)
CPU times: user 1min 55s, sys: 1.68 s, total: 1min 57s
Wall time: 2min 1s
%%time
​
# parallel loop
nc = 2
pool = multiprocessing.Pool(processes=nc)
q = np.hstack(pool.map(forfunc,np.split(x0,nc)))
pool.close()
CPU times: user 92.6 ms, sys: 70.8 ms, total: 163 ms
Wall time: 1min 2s
%%time
​
# parallel loop
nc = 4
pool = multiprocessing.Pool(processes=nc)
q = np.hstack(pool.map(forfunc,np.split(x0,nc)))
pool.close()
CPU times: user 118 ms, sys: 94.6 ms, total: 212 ms
Wall time: 1min
from dask.distributed import Client, LocalCluster, wait
client = Client(n_workers=2, threads_per_worker=1)
%%time
​
nc = 2
chunks = np.split(x0,nc)
client.scatter(chunks, broadcast=True)
q = client.compute([dask.delayed(forfunc)(x0i) for x0i in chunks])
wait(q)
/Users/nickbecker/miniconda3/envs/prophet/lib/python3.7/site-packages/distributed/worker.py:3382: UserWarning: Large object of size 4.00 MB detected in task graph: 
(array([1.000004, 1.000012, 1.00002 , ..., 4.99998 ... 2, 5.      ]),)
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers
future = client.submit(func, big_data)    # bad
big_future = client.scatter(big_data)     # good
future = client.submit(func, big_future)  # good
% (format_bytes(len(b)), s)
CPU times: user 3.67 s, sys: 324 ms, total: 4 s
Wall time: 1min 5s

最新更新