我想计算数组中一个范围的总和(简单( - 但我不想这样做,而是 n 次,应该求和的范围来自第二个数组。
我有一个 0 和 1 的 2D 数组:
count = np.array(
[[0,1,0,0,1,0,1],
[0,0,1,1,1,0,0]])
我有一个结构化的 2D 数组,其中包含要对计数数组求和的范围的字段。
dtype=[..., ('ranges', 'u1', (2, 2)) , ...]
table['ranges']
看起来像这样:
[
[[1, 3], [0, 4]],
[[0, 0], [3, 4]],
[[0, 0], [2 4]],
[[0, 0], [3 4]],
[[3, 7], [1 5]]]
(通常这将在 20 到 几百行之间(。
此示例的结果应为
[2, # = (1 +0) + (0 + 0 +1)
1, # = ( ) + (1)
2, # = ( ) + (1 + 1)
1, # = ( ) + (1)
5] # = (0 + 1 +0 +1 ) + (0 + 1 + 1 + 1)
首先,我从:
result = np.zeros(table.size, dtype=np.int)
for index, r in enumerate(table):
for index, range in enumerate(r['ranges']):
result[index] += np.sum(counts[index][range[0]:range[1]])
给出了正确的结果,但不是效率的例子。
我还尝试消除第二个循环并使其更加麻木:
result = np.zeros(table.size, dtype=np.int)
for index, (from1, to1, from2, to2) in
enumerate(np.nditer(table['ranges'], flags=['external_loop'])):
counts[index] += np.sum(counts[0][from1:to1]) +
np.sum(counts[1][from2:to2])
但是这些代码行仍然是应用程序花费大部分时间的一个点。该应用程序比这大得多,但根据分析器的说法,在这些行上花费的时间大约减少了一半。
所以基本上我正在寻找一种方法来摆脱循环并在 numpy 中做到这一点。 我正在寻找类似的东西
counts=np.sum(counts[1][table['ranges'][0][0]:table['ranges'][0][1])+np.sum(counts[2][table['ranges'][1][0]:table['ranges'][1][1])
但到目前为止,还没有真正找到一个好的方法来做到这一点。
更新进行了一些时间比较:
import numpy as np
import timeit as ti
table = np.empty(5,
dtype=[('s1', np.int8),
('ranges', 'u1', (2, 2)),
('s2', np.int16)])
table["ranges"] = [((1, 3), (0, 4)),
((0, 0), (3, 4)),
((0, 0), (2, 4)),
((0, 0), (3, 4)),
((3, 7), (1, 5))]
results = np.zeros(table.size)
counts = np.array([[0, 1, 0, 0, 1, 0, 1],
[0, 0, 1, 1, 1, 0, 0]])
# version one
def rv1(table, counts, results):
for row_index, r in enumerate(table):
for index, crange in enumerate(r['ranges']):
results[row_index] += np.sum(counts[index][crange[0]:crange[1]])
# version two
def rv2(table, counts, results):
for rowindex, (f1, t1, f2, t2) in
enumerate(np.nditer(table['ranges'], flags=['external_loop'])):
results[rowindex] += np.sum(counts[0][f1:t1]) +
np.sum(counts[1][f2:t2])
# version 3 (TomNash)
def rvTN(table, counts, results):
ranges=table["ranges"]
result=[
sum(counts[0][slice(*ranges[i][0])]) + sum(counts[1][slice(*ranges[i][1])])
for i in range(len(ranges))]
results+=result
results = np.zeros(table.size)
rv1(table, counts, results)
print ("rv1 result" , results)
results = np.zeros(table.size)
rv2(table, counts, results)
print ("rv2 result", results)
results = np.zeros(table.size)
rvTN(table, counts, results)
print ("slice*(TN) result", results)
print ("double loop time " , ti.timeit(lambda : rv1(table, counts, results)))
print ("nditer time " , ti.timeit(lambda : rv2(table, counts, results)))
print ("slice* time " , ti.timeit(lambda : rv3(table, counts, results)))
我得到
double loop result [3. 1. 2. 1. 5.]
nditer result [3. 1. 2. 1. 5.]
slice* result [3. 1. 2. 1. 5.]
double loop time 42.41987561201677
nditer time 36.45269059110433
slice* time 24.102186055853963
所以TomNashs版本快了大约30%。不幸的是,这仍然有些缓慢。
在遇到同样的问题后,我找到了两种编写完全矢量化版本的方法。
一种解决方案是在numpy.add
上使用ufunc.reduceat方法,但由于reduceat
行为的奇怪规范,这最终变得相当混乱和低效。关于 numpy 问题 #834 有一些关于添加新的 ufunc 方法的讨论,该方法可以使此任务成为高效的单行代码,但尚未实现任何内容。
我想出的第二个解决方案似乎更有效,也更容易理解,其工作原理如下:
-
首先,使用 numpy.cumsum 获得沿
counts
长轴的累积和,cumulative_counts
。 -
接下来,获取所有起始索引
starts
和结束索引ends
您要求和的范围,这可以通过将数组切成tables['ranges']
来轻松获得。 -
现在,您可以使用
starts
和ends
索引到cumulative_counts
中来获取每个范围开始和结束时的累积总和的数组。 -
最后,要获得每个范围内的总和,只需从结束总和中减去所有起始总和。
有一个轻微的皱纹,那就是为了使索引正确工作,cumulative_counts
需要在第一个索引处有一个零,然后是cumsum
结果。
首先在 1D 中将这个概念放在一起,因为它稍微清晰一些:
for i in range(len(counts)):
cumulative_counts = np.empty(len(counts[i]) + 1, dtype=counts.dtype)
cumulative_counts[0] = 0
cumulative_counts[1:] = np.cumsum(counts[i])
starts, ends = table['ranges'][:,i,:].T
results += cumulative_counts[ends] - cumulative_counts[starts]
这已经更快了 - 循环仅在短轴(2 行counts
(上,其余的被矢量化。
但也可以消除外循环并将相同的方法应用于 2D 输入,从而为您的示例提供完全矢量化的解决方案:
cumulative_counts = np.empty((counts.shape[0], counts.shape[1] + 1), dtype=counts.dtype)
cumulative_counts[:,0] = 0
cumulative_counts[:,1:] = np.cumsum(counts, axis=1)
starts, ends = table['ranges'].T.swapaxes(1, 2)
rows = np.arange(len(counts)).reshape(1,-1)
row_results = cumulative_counts[rows, ends] - cumulative_counts[rows, starts]
results += row_results.sum(axis=1)
将此版本添加到您的时序比较中,它在我的系统上的运行速度比 TomNash 的切片方法快 30%。我希望这种加速比会针对更大的阵列被放大。
rv1 result [3. 1. 2. 1. 5.]
rv2 result [3. 1. 2. 1. 5.]
slice*(TN) result [3. 1. 2. 1. 5.]
cumsum(ML) result [3. 1. 2. 1. 5.]
double loop time 118.32055400998797
nditer time 101.83165721700061
slice* time 50.249228183995
cumsum time 38.694901300012134
您可以使用slice
和*args
来分解开始,停止索引的列表并对其进行切片。
[sum(count[0][slice(*ranges[i][0])]) + sum(count[1][slice(*ranges[i][1])]) for i in range(len(ranges))]
我认为您的预期结果略微偏离了您的指数,这就是我得到的。
结果
[3, 1, 2, 1, 5]