矢量化功能有问题



对于一个项目,我需要从函数生成样本。我希望能够尽快生成这些样本。

我有这个例子(在最终版本中,参数中将提供functionlambda(。目标是使用lambdafunctionstartstop之间生成n个点的线间距xsys

def get_ys(coefficients, num_outputs=20, start=0., stop=1.):
function = lambda x, args: args[0]*(x-args[1])**2 + args[2]*(x-args[3]) + args[4]
xs = np.linspace(start, stop, num=num_outputs, endpoint=True)
ys = [function(x, coefficients) for x in xs]
return ys
%%time
n = 1000
xs = np.random.random((n,5))
ys = np.apply_along_axis(get_ys, 1, xs)
Wall time: 616 ms

我正在尝试将其矢量化,并找到了numpy.apply_along_axis

%%time
for i in range(1000):
xs = np.random.random(5)
ys = get_ys(xs)
Wall time: 622 ms

不幸的是,它仍然很慢:/

我对函数矢量化不太熟悉,有人能指导我如何提高脚本的速度吗?

谢谢!

编辑:输入/输出示例:

xs = np.ones(5)
ys = get_ys(xs)
[1.0, 0.9501385041551247, 0.9058171745152355, 0.8670360110803323, 0.8337950138504155,0.8060941828254848, 0.7839335180055402, 0.7673130193905817, 0.7562326869806094, 0.7506925207756232, 0.7506925207756232, 0.7562326869806094, 0.7673130193905817, 0.7839335180055401, 0.8060941828254847, 0.8337950138504155,  0.8670360110803323, 0.9058171745152354, 0.9501385041551246, 1.0]
def get_ys(coefficients, num_outputs=20, start=0., stop=1.):
function = lambda x, args: args[0]*(x-args[1])**2 + args[2]*(x-args[3]) + args[4]
xs = np.linspace(start, stop, num=num_outputs, endpoint=True)
ys = [function(x, coefficients) for x in xs]
return ys

您试图绕过调用get_ys1000次,每行调用xs一次。

xs作为一个整体传递给get_ys需要什么?换句话说,如果coefficients是(n,5(而不是(5,(呢?

xs是(20,(,而ys将是相同的(对吧(?

lambda被写入以期望标量x和(5,(args。它可以更改为与(20,(x和(n,5(args一起工作吗?

作为第一步,如果给定xsfunction会产生什么?这不是

ys = [function(x, coefficients) for x in xs]
ys = function(xs, coefficients)

编写时,您的代码迭代(以慢Python速度(n(1000(行和20个linspace。因此function被调用了20000次。这就是使代码变慢的原因。

让我们试试这个改变

使用您的功能运行示例:

In [126]: np.array(get_ys(np.arange(5)))
Out[126]: 
array([-2.        , -1.89473684, -1.78947368, -1.68421053, -1.57894737,
-1.47368421, -1.36842105, -1.26315789, -1.15789474, -1.05263158,
-0.94736842, -0.84210526, -0.73684211, -0.63157895, -0.52631579,
-0.42105263, -0.31578947, -0.21052632, -0.10526316,  0.        ])

将列表理解替换为只调用function:

In [127]: def get_ys1(coefficients, num_outputs=20, start=0., stop=1.):
...:     function = lambda x, args: args[0]*(x-args[1])**2 + args[2]*(x-args[3]) + args[4]
...: 
...:     xs = np.linspace(start, stop, num=num_outputs, endpoint=True)
...:     ys = function(xs, coefficients)
...:     return ys
...: 
...: 

相同值:

In [128]: get_ys1(np.arange(5))
Out[128]: 
array([-2.        , -1.89473684, -1.78947368, -1.68421053, -1.57894737,
-1.47368421, -1.36842105, -1.26315789, -1.15789474, -1.05263158,
-0.94736842, -0.84210526, -0.73684211, -0.63157895, -0.52631579,
-0.42105263, -0.31578947, -0.21052632, -0.10526316,  0.        ])

比较时间:

In [129]: timeit np.array(get_ys(np.arange(5)))
345 µs ± 16.6 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
In [130]: timeit get_ys1(np.arange(5))
89.2 µs ± 162 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)

这就是我们所说的";矢量化"-将python级别的迭代(列表理解(替换为等价的迭代,使numpy数组方法的用户更加充分。

我怀疑我们可以继续使用(n,5(coefficients,但这应该足以让您开始。

完全矢量化

通过broadcasting(n,5(对(20,(,我可以得到一个没有任何python循环的函数:

def get_ys2(coefficients, num_outputs=20, start=0., stop=1.):
function = lambda x, args: args[:,0]*(x-args[:,1])**2 + args[:,2]*(x-args[:,3]) + args[:,4]
xs = np.linspace(start, stop, num=num_outputs, endpoint=True)
ys = function(xs[:,None], coefficients)
return ys.T

并且具有(1,5(输入:

In [156]: get_ys2(np.arange(5)[None,:])
Out[156]: 
array([[-2.        , -1.89473684, -1.78947368, -1.68421053, -1.57894737,
-1.47368421, -1.36842105, -1.26315789, -1.15789474, -1.05263158,
-0.94736842, -0.84210526, -0.73684211, -0.63157895, -0.52631579,
-0.42105263, -0.31578947, -0.21052632, -0.10526316,  0.        ]])

使用您的测试用例:

In [146]: n = 1000
...: xs = np.random.random((n,5))
...: ys = np.apply_along_axis(get_ys, 1, xs)
In [147]: ys.shape
Out[147]: (1000, 20)

两个时间:

In [148]: timeit ys = np.apply_along_axis(get_ys, 1, xs)
...: 
106 ms ± 303 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
In [149]: timeit ys = np.apply_along_axis(get_ys1, 1, xs)
...: 
88 ms ± 98.3 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

并测试这个

In [150]: ys2 = get_ys2(xs)
In [151]: ys2.shape
Out[151]: (1000, 20)
In [152]: np.allclose(ys, ys2)
Out[152]: True
In [153]: timeit ys2 = get_ys2(xs)
424 µs ± 484 ns per loop (mean ± std. dev. of 7 runs, 1000 loops each)

它与价值观相匹配,并大大提高了速度。

在新函数中,args现在可以是(n,5(。如果x是(20,1(,结果是(20,n(,我在返回时将其转置。

最新更新