基于时间的滑动窗口和测量(变化)数据到达率



我正在尝试实现一个基于时间的滑动窗口(在Python中),即数据源插入新的数据项,并且比1小时更早的项被自动删除。最重要的是,我需要测量速率,或者更确切地说,是数据源插入项的速率变化。

我的问题有两个方面。首先,实现基于时间的窗口的最佳方式是什么?在我目前可能很幼稚的解决方案中,我简单地使用了一个Python列表window = []。在新数据item的情况下,我用当前时间戳附加项目:window.append((current_time, item))。然后,使用计时器,每隔1秒I pop所有具有比当前(时间戳-1h)更早时间戳的第一个元素:

threshold = int(time.time()*1000) - self.WINDOW_SIZE_IN_MS
while True:
    try:
        if window[0][0] < threshold:
            del self.word_lists[0]
        else:
            break            
    except:
        break

虽然这是有效的,但我想知道是否有更聪明的解决方案。

更重要的是,测量进入窗口的数据项的速率变化的好方法是什么?在这里,我不知道如何处理这个问题,至少没有一个听起来同样有效的方法。我想到了一些非常幼稚的东西:我将h窗口分成20个间隔,每隔5分钟计算物品的数量。如果最近的5分钟间隔包含的项目明显多于20个间隔的平均值,我就说这是一个爆发。但是我必须每隔1分钟做一次。这听起来效率不高,而且有很多参数。

简而言之,我需要测量新项目进入窗口的加速度。有最佳实践方法吗?

对于第一部分,检查过期的项并在收到要添加的新项时删除它们更有效。也就是说,不要麻烦计时器,它每秒会无故唤醒进程一次-只需在真正的工作发生时承担维护工作。

对于第二部分,整个1小时的长度是已知的。存储一个整数,它是五分钟前列表中的索引。您可以在执行插入操作时维护它,并且您知道您只需要将它向前移动。

把这些放在一起,伪代码:
window = []
recent_index = 0
def insert(time, item):
    while window and window[0][0] < time - timedelta(hours=1):
        window.pop()
        recent_index -= 1
    while window[recent_index][0] < time - timedelta(minutes=5):
        recent_index += 1
    window.append((time, item))
    return float(len(window) - recent_index) / len(window)

上面的函数返回过去一小时内在过去五分钟内到达的物品的百分比。如果超过20%或50%,就会出现爆屏

相关内容

  • 没有找到相关文章

最新更新