在相同长度的1d numpy数组上评估一维函数数组的有效算法



我有一个由k个不同函数组成的(大)长度为N的数组,以及一个由abcissa组成的长度为N数组。我想评估abcissa上的函数,以返回一个长度为N的坐标数组,关键是,我需要非常快地完成。

我尝试了以下循环调用np.where,它太慢了:

创建一些虚假数据来说明问题:

def trivial_functional(i): return lambda x : i*x
k = 250
func_table = [trivial_functional(j) for j in range(k)]
func_table = np.array(func_table) # possibly unnecessary

我们有一个由250个不同函数组成的表。现在,我创建了一个大数组,其中包含这些函数的许多重复条目,以及一组长度相同的点,这些函数应该在这些点上求值。

Npts = 1e6
abcissa_array = np.random.random(Npts)
function_indices = np.random.random_integers(0,len(func_table)-1,Npts)
func_array = func_table[function_indices]

最后,对数据使用的每个函数进行循环,并在相关点集上对其进行评估:

desired_output = np.zeros(Npts)
for func_index in set(function_indices):
    idx = np.where(function_indices==func_index)[0]
    desired_output[idx] = func_table[func_index](abcissa_array[idx])

这个循环在我的笔记本电脑上大约需要0.35秒,这是我代码中一个数量级的最大瓶颈。

有人知道如何避免对np.where的盲查找调用吗?有没有一种巧妙的方法可以加快这种循环?

这与你(出色!)的自我回答几乎相同,但少了一点讽刺。在我的机器上,它似乎也快了一点——粗略测试大约30毫秒。

def apply_indexed_fast(array, func_indices, func_table):
    func_argsort = func_indices.argsort()
    func_ranges = list(np.searchsorted(func_indices[func_argsort], range(len(func_table))))
    func_ranges.append(None)
    out = np.zeros_like(array)
    for f, start, end in zip(func_table, func_ranges, func_ranges[1:]):
        ix = func_argsort[start:end]
        out[ix] = f(array[ix])
    return out

与您的一样,这会将argsort索引序列拆分为块,每个块对应于func_table中的一个函数。然后,它使用每个块为其相应的函数选择输入和输出索引。为了确定块边界,它使用np.searchsorted而不是np.unique——其中searchsorted(a, b)可以被认为是一种二进制搜索算法,它返回a中第一个值等于或大于b中给定值的索引。

然后,zip函数简单地并行迭代其参数,从每个参数中返回一个单独的项,这些项被收集在一个元组中,并将它们串在一起形成一个列表。(因此zip([1, 2, 3], ['a', 'b', 'c'], ['b', 'c', 'd'])返回[(1, 'a', 'b'), (2, 'b', 'c'), (3, 'c', 'd')]。)这与for语句内置的"解包"这些元组的能力一起,允许以一种简洁但富有表现力的方式并行迭代多个序列。

在本例中,我使用它来迭代func_tables中的函数以及func_ranges的两个不同步副本。这确保了end变量中func_ranges中的项目始终领先start变量中的项目一步。通过将None附加到func_ranges,我确保了最终块的处理是优雅的——当zip的任何一个参数的项用完时,它就会停止,从而切断序列中的最终值。方便的是,None值还可以用作开放切片索引!

做同样事情的另一个技巧需要更多的行,但内存开销较低,尤其是当与zipizip:的itertools等价物一起使用时

range_iter_a = iter(func_ranges)   # create generators that iterate over the 
range_iter_b = iter(func_ranges)   # values in `func_ranges` without making copies
next(range_iter_b, None)           # advance the second generator by one
for f, start, end in itertools.izip(func_table, range_iter_a, range_iter_b):
    ...

然而,这些基于生成器的低开销方法有时可能比普通列表慢一点。另外,请注意,在Python3中,zip的行为更像izip

感谢hpaulj建议采用groupby方法。这个操作有很多固定的例程,比如Pandas DataFrames,但它们都伴随着数据结构初始化的开销,这只是一次,但如果只用于一次计算,可能会很昂贵。

这是我的纯numpy解决方案,它比我使用的循环的原始快13倍结果总结是我使用了np.argsortnp.unique以及一些花哨的索引体操。

首先,我们对函数索引进行排序,然后找到排序数组中每个新索引开始的元素

idx_funcsort = np.argsort(function_indices)
unique_funcs, unique_func_indices = np.unique(function_indices[idx_funcsort], return_index=True)

现在不再需要盲查找,因为我们确切地知道排序数组的哪个切片对应于每个唯一的函数。因此,我们仍然在每个被调用的函数上循环,但不调用,其中:

for func_index in range(len(unique_funcs)-1):
    idx_func = idx_funcsort[unique_func_indices[func_index]:unique_func_indices[func_index+1]]
    func = func_table[unique_funcs[func_index]]
    desired_output[idx_func] = func(abcissa_array[idx_func])

这涵盖了除最终索引之外的所有索引,由于Python索引约定,我们需要单独调用它,这有点令人恼火:

func_index = len(unique_funcs)-1
idx_func = idx_funcsort[unique_func_indices[func_index]:]
func = func_table[unique_funcs[func_index]]
desired_output[idx_func] = func(abcissa_array[idx_func])

这给出了与where循环(记账健全性检查)相同的结果,但该循环的运行时间为0.027秒,比我最初的计算速度提高了13倍。

这是函数编程在Python中被模仿的一个很好的例子。

现在,如果您想将函数应用于一组点,我推荐numpyufunc框架,它将允许您创建函数的快速矢量化版本。

最新更新