查找数据流中迄今为止排名前K位的最小值



假设我有一个数据流,其中一次检索单个数据点:

import numpy as np
def next_data_point():
"""
Mock a data stream. Data points will always be a positive float
"""
return np.random.uniform(0, 1_000_000, dtype='float')

我需要能够更新NumPy数组,并跟踪到目前为止该流的前K个最小值(或者直到用户决定何时可以通过某种check_stop_condition()函数停止分析(。假设我们想从流中捕获前1000个最小值,那么实现这一点的一种简单方法可能是:

k = 1000
topk = np.full(k, fille_value=np.inf, dtype='float')
while check_stop_condition():
topk[:] = np.sort(np.append(topk, next_data_point()))[:k]

这很好,但效率很低,如果重复数百万次可能会很慢,因为我们是:

  1. 每次创建一个新数组
  2. 每次对连接的数组进行排序

因此,我想出了一种不同的方法来解决这两种效率低下的问题:

k = 1000
topk = np.full(k, fille_value=np.inf)
while check_stop_condition():
data_point = next_data_point()
idx = np.searchsorted(topk, data_point)
if idx < k:
topk[idx : -1] = topk[idx + 1 :] 
topk[idx] = data_point 

在这里,我利用np.searchsorted()来替换np.sort,并快速找到下一个数据点的插入点idx。我相信np.searchsorted使用了某种二进制搜索,并假设初始数组首先是预排序的。然后,我们移动topk中的数据以适应并插入新的数据点,当且仅当idx < k

我在任何地方都没有看到这样做,所以我的问题是,是否有什么可以做的来提高效率?尤其是我在if语句中转换内容的方式。

对一个巨大的数组进行排序非常昂贵,所以第二种方法更快也就不足为奇了。然而,第二种方法的速度可能受到慢速数组复制的限制。第一种方法的复杂性为O(k log(k) n),而第二种方法的复杂度为O(n (log(k) + k * p)),其中n是点数,p是要采取的分支的概率。

要构建更快的实现,可以使用。例如,更具体地说是一个自平衡二进制搜索树。以下是算法:

topk = Tree()
maxi = np.inf
while check_stop_condition():             # O(n)
data_point = next_data_point()
if len(topk) <= 1000:                 # O(1)
topk.insert(data_point)           # O(log k)
elif data_point < maxi:               # Discard the value in O(1)
topk.insert(data_point)           # O(log k)
topk.deleteMaxNode()              # O(log k)
maxi = topk.findMaxValue()        # O(log k)

上述算法在O(n log k)中运行。可以证明这种复杂性是最优的(仅使用data_point比较(。

在实践中,二进制堆可以更快一点(具有相同的复杂性(。事实上,在这种情况下,它们比自平衡二进制搜索树有几个优势:

  • 它们可以在内存中以非常紧凑的方式实现(减少缓存未命中和内存消耗(
  • CCD_ 15第一项的插入可以在CCD_ 16时间内非常快速地完成

请注意,丢弃的值是在恒定时间内计算的。这在巨大的随机数据集上增加了很多,因为大多数值很快就会大于maxi。On甚至可以证明随机数据集可以在O(n)时间内计算(最优(。

请注意,Python3提供了一个名为heapq的标准堆实现,这可能是一个很好的起点。

最新更新