从numpy数组中快速选择,无需中间索引数组



给定以下两列数组,我希望从第二列中选择与第一列中的"边"相对应的项。这只是一个例子,因为实际上我的a可能有数百万行。所以,理想情况下,我希望尽可能快地完成这项工作,而不会产生中间结果。

import numpy as np
a = np.array([[1,4],[1,2],[1,3],[2,6],[2,1],[2,8],[2,3],[2,1],
              [3,6],[3,7],[5,4],[5,9],[5,1],[5,3],[5,2],[8,2],
              [8,6],[8,8]])

即我想找到结果,

desired = np.array([4,6,6,4,2])

其是CCD_ 2中与CCD_。

一种解决方案是

b = a[(a[1:,0]-a[:-1,0]).nonzero()[0]+1, 1]

这给出了np.array([6,6,4,2]),我可以简单地准备第一个项目,没有问题。然而,这会创建第一个项的索引的中间数组。我可以通过使用列表理解来避免中间阶段:

c = [a[i+1,1] for i,(x,y) in enumerate(zip(a[1:,0],a[:-1,0])) if x!=y]

这也给出了CCD_ 5。假设是基于生成器的zip(在Python3中为true),则不需要创建中间表示,并且应该非常节省内存。然而,内部循环不是numpy,它需要生成一个列表,该列表随后必须返回到numpy数组中。

你能想出一个只有numpy的版本,它的内存效率是c,但速度效率却是b吗?理想情况下,只需要通过a一次。

(请注意,除非a非常大,否则测量速度在这里没有多大帮助,所以我不想对其进行基准测试,我只想要理论上快速且内存高效的东西。例如,你可以假设a中的行是从一个文件流式传输的,访问速度很慢——这是避免使用b解决方案的另一个原因,因为它需要在a上进行第二次随机访问。)。)

编辑:一种生成用于测试的大型a矩阵的方法:

from itertools import repeat
N, M = 100000, 100
a = np.array(zip([x for y in zip(*repeat(np.arange(N),M)) for x in y ], np.random.random(N*M)))

如果你想以矢量化的方式实现这一点,恐怕你无法避免中间数组,因为它没有内置的。

现在,让我们寻找nonzero()以外的矢量化方法,它可能更具性能。按照与(a[1:,0]-a[:-1,0])的原始代码相同的执行微分的想法,我们可以在寻找对应于"边"或移位的非零微分后使用布尔索引。

因此,我们将有一种像so-这样的矢量化方法

a[np.append(True,np.diff(a[:,0])!=0),1]

运行时测试

原始解决方案a[(a[1:,0]-a[:-1,0]).nonzero()[0]+1,1]将跳过第一行。但是,为了时间的目的,我们只能说,这是一个有效的结果。以下是针对这个后中提出的解决方案的运行时

In [118]: from itertools import repeat
     ...: N, M = 100000, 2
     ...: a = np.array(zip([x for y in zip(*repeat(np.arange(N),M))
                              for x in y ], np.random.random(N*M)))
     ...: 
In [119]: %timeit a[(a[1:,0]-a[:-1,0]).nonzero()[0]+1,1]
100 loops, best of 3: 6.31 ms per loop
In [120]: %timeit a[1:][np.diff(a[:,0])!=0,1]
100 loops, best of 3: 4.51 ms per loop

现在,假设您也想包括第一行。更新后的运行时看起来像这样-

In [123]: from itertools import repeat
     ...: N, M = 100000, 2
     ...: a = np.array(zip([x for y in zip(*repeat(np.arange(N),M))
                              for x in y ], np.random.random(N*M)))
     ...: 
In [124]: %timeit a[np.append(0,(a[1:,0]-a[:-1,0]).nonzero()[0]+1),1]
100 loops, best of 3: 6.8 ms per loop
In [125]: %timeit a[np.append(True,np.diff(a[:,0])!=0),1]
100 loops, best of 3: 5 ms per loop

好的,实际上我找到了一个解决方案,刚刚了解了np.fromiter,它可以基于生成器构建numpy数组:

d = np.fromiter((a[i+1,1] for i,(x,y) in enumerate(zip(a[1:,0],a[:-1,0])) if x!=y), int)

我认为这可以做到,生成一个没有任何中间数组的numpy数组。然而,需要注意的是,它似乎并没有那么有效!忘记了我在关于测试的问题中所说的:

t = [lambda a: a[(a[1:,0]-a[:-1,0]).nonzero()[0]+1, 1],
     lambda a: np.array([a[i+1,1] for i,(x,y) in enumerate(zip(a[1:,0],a[:-1,0])) if x!=y]),
     lambda a: np.fromiter((a[i+1,1] for i,(x,y) in enumerate(zip(a[1:,0],a[:-1,0])) if x!=y), int)]
from timeit import Timer
[Timer(x(a)).timeit(number=10) for x in t]
[0.16596235800034265, 1.811289312000099, 2.1662971739997374]

看来第一个解决方案要快得多!我认为这是因为即使它生成中间数据,它也能够完全用numpy执行内部循环,而在另一种情况下,它为数组中的每个项运行Python代码。

正如我所说,这就是为什么我不确定这种基准测试在这里是否有意义——如果对a的访问速度慢得多,那么基准测试就不会加载CPU。想法?

不"接受"这个答案,因为我希望有人能更快地想出办法。

如果内存效率是您关心的问题,那么可以这样解决:与输入数据大小顺序相同的唯一中间数据可以是bool类型(a[1:,0]!=a[:-1,0]);如果您的输入数据是int32,那么它比"a"本身小8倍。您可以对该二进制数组的非零进行计数,以预分配输出数组;尽管如果!=的输出正如您的示例所显示的那样稀疏。

最新更新