如何在数组中对范围值求和,其中每行都有不同的范围



我想计算数组中一个范围的总和(简单( - 但我不想这样做,而是 n 次,应该求和的范围来自第二个数组。

我有一个 0 和 1 的 2D 数组:

count = np.array(
[[0,1,0,0,1,0,1],
[0,0,1,1,1,0,0]])

我有一个结构化的 2D 数组,其中包含要对计数数组求和的范围的字段。

dtype=[..., ('ranges', 'u1', (2, 2)) , ...]

table['ranges']看起来像这样:

[
[[1, 3], [0, 4]],
[[0, 0], [3, 4]],
[[0, 0], [2 4]],
[[0, 0], [3 4]],
[[3, 7], [1 5]]]

(通常这将在 20 到 几百行之间(。

此示例的结果应为

[2, # = (1 +0) + (0 + 0 +1)
1, # = ( ) + (1)
2,  # = ( ) + (1 + 1)
1, # = ( ) + (1)
5] # = (0 + 1 +0 +1 ) + (0 + 1 + 1 + 1)

首先,我从:

result = np.zeros(table.size, dtype=np.int)
for index, r in enumerate(table):
for index, range in enumerate(r['ranges']):
result[index] += np.sum(counts[index][range[0]:range[1]])

给出了正确的结果,但不是效率的例子。

我还尝试消除第二个循环并使其更加麻木:

result = np.zeros(table.size, dtype=np.int)
for index, (from1, to1, from2, to2) in 
enumerate(np.nditer(table['ranges'], flags=['external_loop'])):
counts[index] += np.sum(counts[0][from1:to1]) +
np.sum(counts[1][from2:to2])

但是这些代码行仍然是应用程序花费大部分时间的一个点。该应用程序比这大得多,但根据分析器的说法,在这些行上花费的时间大约减少了一半。

所以基本上我正在寻找一种方法来摆脱循环并在 numpy 中做到这一点。 我正在寻找类似的东西

counts=np.sum(counts[1][table['ranges'][0][0]:table['ranges'][0][1])+np.sum(counts[2][table['ranges'][1][0]:table['ranges'][1][1])

但到目前为止,还没有真正找到一个好的方法来做到这一点。

更新进行了一些时间比较:

import numpy as np
import timeit as ti
table = np.empty(5,
dtype=[('s1', np.int8),
('ranges', 'u1', (2, 2)),
('s2', np.int16)])
table["ranges"] = [((1, 3), (0, 4)),
((0, 0), (3, 4)),
((0, 0), (2, 4)),
((0, 0), (3, 4)),
((3, 7), (1, 5))]
results = np.zeros(table.size)
counts = np.array([[0, 1, 0, 0, 1, 0, 1],
[0, 0, 1, 1, 1, 0, 0]])

# version one
def rv1(table, counts, results):
for row_index, r in enumerate(table):
for index, crange in enumerate(r['ranges']):
results[row_index] += np.sum(counts[index][crange[0]:crange[1]])

# version two
def rv2(table, counts, results):
for rowindex, (f1, t1, f2, t2) in 
enumerate(np.nditer(table['ranges'], flags=['external_loop'])):
results[rowindex] += np.sum(counts[0][f1:t1]) +
np.sum(counts[1][f2:t2])

# version 3 (TomNash)
def rvTN(table, counts, results):
ranges=table["ranges"]
result=[
sum(counts[0][slice(*ranges[i][0])]) + sum(counts[1][slice(*ranges[i][1])])
for i in range(len(ranges))]
results+=result

results = np.zeros(table.size)
rv1(table, counts, results)
print ("rv1 result" , results)

results = np.zeros(table.size)
rv2(table, counts, results)
print ("rv2 result", results)
results = np.zeros(table.size)
rvTN(table, counts, results)
print ("slice*(TN) result", results)

print ("double loop time " , ti.timeit(lambda : rv1(table, counts, results)))
print ("nditer time " ,  ti.timeit(lambda : rv2(table, counts, results)))
print ("slice* time " ,  ti.timeit(lambda : rv3(table, counts, results)))

我得到

double loop result [3. 1. 2. 1. 5.]
nditer result [3. 1. 2. 1. 5.]
slice* result [3. 1. 2. 1. 5.]
double loop time  42.41987561201677
nditer time  36.45269059110433
slice* time  24.102186055853963

所以TomNashs版本快了大约30%。不幸的是,这仍然有些缓慢。

在遇到同样的问题后,我找到了两种编写完全矢量化版本的方法。

一种解决方案是在numpy.add上使用ufunc.reduceat方法,但由于reduceat行为的奇怪规范,这最终变得相当混乱和低效。关于 numpy 问题 #834 有一些关于添加新的 ufunc 方法的讨论,该方法可以使此任务成为高效的单行代码,但尚未实现任何内容。

我想出的第二个解决方案似乎更有效,也更容易理解,其工作原理如下:

  • 首先,使用 numpy.cumsum 获得沿counts长轴的累积和,cumulative_counts

  • 接下来,获取所有起始索引starts和结束索引ends您要求和的范围,这可以通过将数组切成tables['ranges']来轻松获得。

  • 现在,您可以使用startsends索引到cumulative_counts中来获取每个范围开始和结束时的累积总和的数组。

  • 最后,要获得每个范围内的总和,只需从结束总和中减去所有起始总和。

有一个轻微的皱纹,那就是为了使索引正确工作,cumulative_counts需要在第一个索引处有一个零,然后是cumsum结果。

首先在 1D 中将这个概念放在一起,因为它稍微清晰一些:

for i in range(len(counts)):
cumulative_counts = np.empty(len(counts[i]) + 1, dtype=counts.dtype)
cumulative_counts[0] = 0
cumulative_counts[1:] = np.cumsum(counts[i])
starts, ends = table['ranges'][:,i,:].T
results += cumulative_counts[ends] - cumulative_counts[starts]

这已经更快了 - 循环仅在短轴(2 行counts(上,其余的被矢量化。

但也可以消除外循环并将相同的方法应用于 2D 输入,从而为您的示例提供完全矢量化的解决方案:

cumulative_counts = np.empty((counts.shape[0], counts.shape[1] + 1), dtype=counts.dtype)
cumulative_counts[:,0] = 0
cumulative_counts[:,1:] = np.cumsum(counts, axis=1)
starts, ends = table['ranges'].T.swapaxes(1, 2)
rows = np.arange(len(counts)).reshape(1,-1)
row_results = cumulative_counts[rows, ends] - cumulative_counts[rows, starts]
results += row_results.sum(axis=1)

将此版本添加到您的时序比较中,它在我的系统上的运行速度比 TomNash 的切片方法快 30%。我希望这种加速比会针对更大的阵列被放大。

rv1 result [3. 1. 2. 1. 5.]
rv2 result [3. 1. 2. 1. 5.]
slice*(TN) result [3. 1. 2. 1. 5.]
cumsum(ML) result [3. 1. 2. 1. 5.]
double loop time  118.32055400998797
nditer time  101.83165721700061
slice* time  50.249228183995
cumsum time  38.694901300012134

您可以使用slice*args来分解开始,停止索引的列表并对其进行切片。

[sum(count[0][slice(*ranges[i][0])]) + sum(count[1][slice(*ranges[i][1])]) for i in range(len(ranges))]

我认为您的预期结果略微偏离了您的指数,这就是我得到的。

结果

[3, 1, 2, 1, 5]

最新更新