有效地找到numpy数组中按索引划分的子数组的总和



给定一个数组"array"和一组索引"indexes",如何找到以矢量化方式沿这些索引拆分数组所形成的子数组的累积和?为了澄清,假设我有:

>>> array = np.arange(20)
>>> array
array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19])
indices = np.arrray([3, 8, 14])

操作应输出:

array([0, 1, 3, 3, 7, 12, 18, 25, 8, 17, 27, 38, 50, 63, 14, 29, 45, 62, 80, 99])

请注意,数组很大(100000个元素),因此,我需要一个矢量化的答案。使用任何循环都会大大降低速度。此外,如果我有同样的问题,但有一个2D数组和相应的索引,并且我需要对数组中的每一行做同样的事情,我该怎么做?

对于2D版本:

>>>array = np.arange(12).reshape((3,4))
>>>array
array([[ 0,  1,  2,  3],
       [ 4,  5,  6,  7],
       [ 8,  9, 10, 11]])
>>> indices = np.array([[2], [1, 3], [1, 2]])

输出为:

array([[ 0,  1,  3,  3],
       [ 4,  9,  6, 13],
       [ 8, 17, 10, 11]])

澄清一下:每一行都将被拆分。

您可以在indices位置引入原始累积求和数组的微分,以在这些位置创建类似边界的效果,这样,当微分数组被累积求和时,我们就可以得到停止累积求和的索引输出。乍一看,这可能有点像是人为设计的,但坚持下去,尝试其他样本,希望会有意义!这个想法与this other MATLAB solution.中应用的非常相似,因此,遵循这样的理念,这里有一种使用numpy.diffcumulative summation-的方法

# Get linear indices
n = array.shape[1]
lidx = np.hstack(([id*n+np.array(item) for id,item in enumerate(indices)]))
# Get successive differentiations
diffs = array.cumsum(1).ravel()[lidx] - array.ravel()[lidx]
# Get previous group's offsetted summations for each row at all 
# indices positions across the entire 2D array
_,idx = np.unique(lidx/n,return_index=True)
offsetted_diffs = np.diff(np.append(0,diffs))
offsetted_diffs[idx] = diffs[idx]
# Get a copy of input array and place previous group's offsetted summations 
# at indices. Then, do cumulative sum which will create a boundary like 
# effect with those offsets at indices positions.
arrayc = array.copy()
arrayc.ravel()[lidx] -= offsetted_diffs
out = arrayc.cumsum(1)

这应该是一个几乎的矢量化解决方案,这几乎是因为即使我们在循环中计算线性索引,但由于它不是计算密集型的部分,因此它对总运行时间的影响最小。此外,如果您不关心为了节省内存而破坏输入,则可以将arrayc替换为array

样本输入、输出-

In [75]: array
Out[75]: 
array([[ 0,  1,  2,  3,  4,  5,  6,  7],
       [ 8,  9, 10, 11, 12, 13, 14, 15],
       [16, 17, 18, 19, 20, 21, 22, 23]])
In [76]: indices
Out[76]: array([[3, 6], [4, 7], [5]], dtype=object)
In [77]: out
Out[77]: 
array([[ 0,  1,  3,  3,  7, 12,  6, 13],
       [ 8, 17, 27, 38, 12, 25, 39, 15],
       [16, 33, 51, 70, 90, 21, 43, 66]])

您可以使用np.split沿索引拆分数组,然后使用函数map中内置的python将np.cumsum()应用于子数组。并在最后用np.hstack把结果转换成一个集成阵列:

>>> np.hstack(map(np.cumsum,np.split(array,indices)))
array([ 0,  1,  3,  3,  7, 12, 18, 25,  8, 17, 27, 38, 50, 63, 14, 29, 45,
       62, 80, 99])

注意由于map是python中的一个内置函数,并且已经在python解释器内的C中实现,因此它的性能将优于常规循环1

这里有一个2D阵列的替代方案:

>>> def func(array,indices):
...     return np.hstack(map(np.cumsum,np.split(array,indices)))
... 
>>> 
>>> array
array([[ 0,  1,  2,  3],
       [ 4,  5,  6,  7],
       [ 8,  9, 10, 11]])
>>> 
>>> indices
array([[2], [1, 3], [1, 2]], dtype=object)
>>> np.array([func(arr,ind) for arr,ind in np.array((array,indices)).T])
array([[ 0,  1,  2,  5],
       [ 4,  5, 11,  7],
       [ 8,  9, 10, 21]])

请注意您的预期输出不是基于np.split的工作方式。

如果你想得到这样的结果,你需要在你的指数中加1:

>>> indices = np.array([[3], [2, 4], [2, 3]], dtype=object)
>>> 
>>> np.array([func(arr,ind) for arr,ind in np.array((array,indices)).T])
array([[  0.,   1.,   3.,   3.],
       [  4.,   9.,   6.,  13.],
       [  8.,  17.,  10.,  11.]])

由于有评论说,使用生成器表达式和map函数之间没有性能差异,所以我运行了一个基准测试,它更好地展示了结果。

# Use map
~$ python -m timeit --setup "import numpy as np;array = np.arange(20);indices = np.array([3, 8, 14])" "np.hstack(map(np.cumsum,np.split(array,indices)))"
10000 loops, best of 3: 72.1 usec per loop
# Use generator expression
~$ python -m timeit --setup "import numpy as np;array = np.arange(20);indices = np.array([3, 8, 14])" "np.hstack(np.cumsum(a) for a in np.split(array,indices))"
10000 loops, best of 3: 81.2 usec per loop

注意,这并不意味着使用以C速度执行的map可以使代码以C速度预成型。正因为如此,代码已经在python中实现,调用函数(第一个参数)并将其应用于可迭代项需要时间。

最新更新