我有一个由k个不同函数组成的(大)长度为N的数组,以及一个由abcissa组成的长度为N数组。我想评估abcissa上的函数,以返回一个长度为N的坐标数组,关键是,我需要非常快地完成。
我尝试了以下循环调用np.where,它太慢了:
创建一些虚假数据来说明问题:
def trivial_functional(i): return lambda x : i*x
k = 250
func_table = [trivial_functional(j) for j in range(k)]
func_table = np.array(func_table) # possibly unnecessary
我们有一个由250个不同函数组成的表。现在,我创建了一个大数组,其中包含这些函数的许多重复条目,以及一组长度相同的点,这些函数应该在这些点上求值。
Npts = 1e6
abcissa_array = np.random.random(Npts)
function_indices = np.random.random_integers(0,len(func_table)-1,Npts)
func_array = func_table[function_indices]
最后,对数据使用的每个函数进行循环,并在相关点集上对其进行评估:
desired_output = np.zeros(Npts)
for func_index in set(function_indices):
idx = np.where(function_indices==func_index)[0]
desired_output[idx] = func_table[func_index](abcissa_array[idx])
这个循环在我的笔记本电脑上大约需要0.35秒,这是我代码中一个数量级的最大瓶颈。
有人知道如何避免对np.where的盲查找调用吗?有没有一种巧妙的方法可以加快这种循环?
这与你(出色!)的自我回答几乎相同,但少了一点讽刺。在我的机器上,它似乎也快了一点——粗略测试大约30毫秒。
def apply_indexed_fast(array, func_indices, func_table):
func_argsort = func_indices.argsort()
func_ranges = list(np.searchsorted(func_indices[func_argsort], range(len(func_table))))
func_ranges.append(None)
out = np.zeros_like(array)
for f, start, end in zip(func_table, func_ranges, func_ranges[1:]):
ix = func_argsort[start:end]
out[ix] = f(array[ix])
return out
与您的一样,这会将argsort
索引序列拆分为块,每个块对应于func_table
中的一个函数。然后,它使用每个块为其相应的函数选择输入和输出索引。为了确定块边界,它使用np.searchsorted
而不是np.unique
——其中searchsorted(a, b)
可以被认为是一种二进制搜索算法,它返回a
中第一个值等于或大于b
中给定值的索引。
然后,zip函数简单地并行迭代其参数,从每个参数中返回一个单独的项,这些项被收集在一个元组中,并将它们串在一起形成一个列表。(因此zip([1, 2, 3], ['a', 'b', 'c'], ['b', 'c', 'd'])
返回[(1, 'a', 'b'), (2, 'b', 'c'), (3, 'c', 'd')]
。)这与for
语句内置的"解包"这些元组的能力一起,允许以一种简洁但富有表现力的方式并行迭代多个序列。
在本例中,我使用它来迭代func_tables
中的函数以及func_ranges
的两个不同步副本。这确保了end
变量中func_ranges
中的项目始终领先start
变量中的项目一步。通过将None
附加到func_ranges
,我确保了最终块的处理是优雅的——当zip
的任何一个参数的项用完时,它就会停止,从而切断序列中的最终值。方便的是,None
值还可以用作开放切片索引!
做同样事情的另一个技巧需要更多的行,但内存开销较低,尤其是当与zip
、izip
:的itertools
等价物一起使用时
range_iter_a = iter(func_ranges) # create generators that iterate over the
range_iter_b = iter(func_ranges) # values in `func_ranges` without making copies
next(range_iter_b, None) # advance the second generator by one
for f, start, end in itertools.izip(func_table, range_iter_a, range_iter_b):
...
然而,这些基于生成器的低开销方法有时可能比普通列表慢一点。另外,请注意,在Python3中,zip
的行为更像izip
。
感谢hpaulj建议采用groupby方法。这个操作有很多固定的例程,比如Pandas DataFrames,但它们都伴随着数据结构初始化的开销,这只是一次,但如果只用于一次计算,可能会很昂贵。
这是我的纯numpy解决方案,它比我使用的循环的原始快13倍结果总结是我使用了np.argsort和np.unique以及一些花哨的索引体操。
首先,我们对函数索引进行排序,然后找到排序数组中每个新索引开始的元素
idx_funcsort = np.argsort(function_indices)
unique_funcs, unique_func_indices = np.unique(function_indices[idx_funcsort], return_index=True)
现在不再需要盲查找,因为我们确切地知道排序数组的哪个切片对应于每个唯一的函数。因此,我们仍然在每个被调用的函数上循环,但不调用,其中:
for func_index in range(len(unique_funcs)-1):
idx_func = idx_funcsort[unique_func_indices[func_index]:unique_func_indices[func_index+1]]
func = func_table[unique_funcs[func_index]]
desired_output[idx_func] = func(abcissa_array[idx_func])
这涵盖了除最终索引之外的所有索引,由于Python索引约定,我们需要单独调用它,这有点令人恼火:
func_index = len(unique_funcs)-1
idx_func = idx_funcsort[unique_func_indices[func_index]:]
func = func_table[unique_funcs[func_index]]
desired_output[idx_func] = func(abcissa_array[idx_func])
这给出了与where循环(记账健全性检查)相同的结果,但该循环的运行时间为0.027秒,比我最初的计算速度提高了13倍。
这是函数编程在Python中被模仿的一个很好的例子。
现在,如果您想将函数应用于一组点,我推荐numpy
的ufunc
框架,它将允许您创建函数的快速矢量化版本。