我有一个web服务器,它只支持一个非常简单的API-计算在最后一小时、分钟和秒内收到的请求数。该服务器在世界上非常流行,每秒接收数千个请求。
目的是找到如何准确地将这3个值返回到每个请求?
请求总是来的,所以每个请求一小时、一分钟和一秒钟的窗口是不同的。如何管理每个请求的不同窗口,以便每个请求的计数正确?
如果需要100%的准确度:
有一个所有请求和3个计数的链接列表——最后一小时、最后一分钟和最后一秒。
你将有两个指针进入链表——一分钟前和一秒钟前。
一个小时前将在列表的末尾。每当上次请求的时间比当前时间早一个多小时时,请将其从列表中删除,并递减小时计数。
分钟指针和秒指针将分别指向一分钟后和一秒钟前发生的第一个请求。每当请求的时间比当前时间早一分钟/秒时,向上移动指针并递减分钟/秒计数。
当收到新请求时,将其添加到所有3个计数中,并将其添加至链表的前面。
请求计数只需返回计数即可。
上述所有业务均按固定时间摊销。
如果精度低于100%是可以接受的:
上面的空间复杂性可能有点大,这取决于你通常每秒会收到多少请求;你可以通过稍微牺牲准确性来减少这种情况,如下所示:
有一个如上所述的链接列表,但仅限于最后一秒。还有3个计数。
然后有一个由60个元素组成的圆形阵列,指示最后60秒中每一秒的计数。每当过了一秒,从分钟计数中减去数组的最后一个(最旧的)元素,并将最后一秒计数添加到数组中。
在最后60分钟内使用类似的圆形阵列。
精度损失:分钟计数可在一秒内被所有请求关闭,小时计数可在1分钟内被所有请求取消。
显然,如果你每秒只有一个请求或更少,这就没有意义了。在这种情况下,您可以将最后一分钟保留在链表中,并在最后60分钟使用一个循环数组。
这方面还有其他变化——精度与空间使用率可以根据需要进行调整。
删除旧元素的计时器:
如果只有当新元素进入时才移除旧元素,则它将按恒定时间摊销(有些操作可能需要更长的时间,但它将平均为恒定时间)。
如果你想要真正的恒定时间,你可以另外运行一个计时器,它会删除旧的元素,每次调用(当然还有插入和检查计数)只需要恒定的时间,因为你最多会删除自上次计时器计时以来在恒定时间内插入的一些元素。
要在T秒的时间窗口内完成此操作,请使用队列数据结构,在其中对单个请求到达时的时间戳进行排队。当您想读取在最近的T秒窗口内到达的请求数时,首先从队列的"旧"端删除那些早于T秒的时间戳,然后读取队列的大小。无论何时向队列添加新请求,都应该删除元素,以保持其大小有界(假设传入请求的速率有界)。
该解决方案可达到任意精度,例如毫秒精度。如果您满足于返回近似答案,您可以(例如)在T=3600(一小时)的时间窗口内,将同一秒内的请求合并到一个队列元素中,使队列大小以3600为界。我认为这很好,但理论上会失去准确性。对于T=1,如果您愿意,可以在毫秒级别上进行合并。
在伪代码中:
queue Q
proc requestReceived()
Q.insertAtFront(now())
collectGarbage()
proc collectGarbage()
limit = now() - T
while (! Q.empty() && Q.lastElement() < limit)
Q.popLast()
proc count()
collectGarbage()
return Q.size()
为什么不使用循环数组?这个数组中有3600个元素。
index = 0;
Array[index % 3600] = count_in_one_second.
++index;
如果您想要最后一秒,请返回此数组的最后一个元素。如果您想要最后一分钟,请返回最后60个元素的总和。如果您想要最后一个小时,请返回整个数组(3600个元素)的总和。
这不是一个简单有效的解决方案吗?
感谢
Deryk
一个解决方案如下:
- 使用长度为3600的圆形数组(一小时内60*60秒)来保存最后一小时内每秒的数据
若要记录新一秒的数据,请通过移动圆形数组的头指针将上一秒数据放入圆形数组中。
- 在循环数组的每个元素中,我们不保存特定秒内的请求数,而是记录之前看到的请求数的累积和,并且一段时间内的请求数量可以通过
requests_sum.get(current_second) - requests_sum.get(current_second - number_of_seconds_in_this_period)
计算
像increment()
、getCountForLastMinute()
、getCountForLastHour()
这样的所有操作都可以在O(1)
时间内完成。
================================================
下面是一个如何工作的例子。
如果我们在最近3秒内有这样的请求计数:
1st second: 2 requests
2nd second: 4 requests
3rd second: 3 requests
圆形数组如下所示:sum = [2, 6, 9]
其中6=4+2和9=2+4+3
在这种情况下:
如果您想获得最后一秒的请求计数(第三秒的请求数),只需计算
sum[2] - sum[1] = 9 - 6 = 3
如果您想获得最后两秒的请求计数(第3秒的请求数和第2秒的请求量),只需计算
sum[2] - sum[0] = 9 - 2 = 7
您可以为一小时内的每秒创建一个大小为60x60的数组,并将其用作循环缓冲区。每个条目包含给定秒的请求数。当你进入下一秒时,清除它并开始计数。当您处于数组的末尾时,您将再次从0开始,因此有效地清除了1小时之前的所有计数。
- For Hour:返回所有元素的总和
- For Minute:返回最后60个条目的总和(来自currentIndex)
- For Second:当前索引的返回计数
所以这三者都具有O(1)的空间和时间复杂性。唯一的缺点是,它忽略了毫秒,但也可以应用相同的概念来包括毫秒。
以下代码在JS中。它将返回O(1)中的计数。我为一次采访写了这个节目,时间被预先定义为5分钟。但你可以修改这个代码几秒钟、几分钟等等。让我知道它的进展。
- 创建一个以毫秒为键、以计数器为值的对象
- 添加名为totalCount的属性并将其预定义为0
- 对于步骤1中定义的每个命中日志增量计数器和totalCount
- 添加一个名为clean_hits的方法,每毫秒调用一次此方法
-
在clean_hits方法中,从我们创建的对象中删除每个条目(在我们的时间范围之外),并在删除条目之前从totalCount中减去该计数
this.hitStore = { "totalCount" : 0};
我不得不在Go中解决这个问题,我想我还没有看到这种方法,但它可能也非常适合我的用例。
由于它连接到第三方API,并且需要限制自己的请求,所以我只在最后一秒保留了一个计数器,在最后2分钟(我需要的两个计数器)
var callsSinceLastSecond, callsSinceLast2Minutes uint64
然后,当呼叫计数器低于我的允许限制时,我会在单独的go例程中启动我的请求
for callsSinceLastSecond > 20 || callsSinceLast2Minutes > 100 {
time.Sleep(10 * time.Millisecond)
}
在每个go例程结束时,我会原子性地递减计数器。
go func() {
time.Sleep(1 * time.Second)
atomic.AddUint64(&callsSinceLastSecond, ^uint64(0))
}()
go func() {
time.Sleep(2 * time.Minute)
atomic.AddUint64(&callsSinceLast2Minutes, ^uint64(0))
}()
到目前为止,这似乎在一些相当繁重的测试中没有任何问题。
这里有一个通用的Java解决方案,可以跟踪最后一分钟的事件数量。
我之所以使用ConcurrentSkipListSet
,是因为它保证了搜索、插入和删除操作的平均时间复杂性为O(logN)。您可以很容易地更改下面的代码,使持续时间(默认为1分钟)可配置。
正如上面的答案所建议的,定期清理过时的条目是一个好主意,例如使用调度器。
@Scope(value = "prototype")
@Component
@AllArgsConstructor
public class TemporalCounter {
@Builder
private static class CumulativeCount implements Comparable<CumulativeCount> {
private final Instant timestamp;
private final int cumulatedValue;
@Override
public int compareTo(CumulativeCount o) {
return timestamp.compareTo(o.timestamp);
}
}
private final CurrentDateTimeProvider currentDateTimeProvider;
private final ConcurrentSkipListSet<CumulativeCount> metrics = new ConcurrentSkipListSet<>();
@PostConstruct
public void init() {
Instant now = currentDateTimeProvider.getNow().toInstant();
metrics.add(new CumulativeCount(now, 0));
}
public void increment() {
Instant now = currentDateTimeProvider.getNow().toInstant();
int previousCount = metrics.isEmpty() ? 0 : metrics.last().cumulatedValue;
metrics.add(new CumulativeCount(now, previousCount + 1));
}
public int getLastCount() {
if (!metrics.isEmpty()) {
cleanup();
CumulativeCount previousCount = metrics.first();
CumulativeCount mostRecentCount = metrics.last();
if (previousCount != null && mostRecentCount != null) {
return mostRecentCount.cumulatedValue - previousCount.cumulatedValue;
}
}
return 0;
}
public void cleanup() {
Instant upperBoundInstant = currentDateTimeProvider.getNow().toInstant().minus(Duration.ofMinutes(1));
CumulativeCount c = metrics.lower(CumulativeCount.builder().timestamp(upperBoundInstant).build());
if (c != null) {
metrics.removeIf(o -> o.timestamp.isBefore(c.timestamp));
if (metrics.isEmpty()) {
init();
}
}
}
public void reset() {
metrics.clear();
init();
}
}
我的解决方案:
-
维护一个3600的散列,其中包含一个计数、时间戳作为字段。
-
对于每个请求:
- 按时间戳%3600获取idx(当前元素的arry索引)
- 如果hash[idx].count=0,则hash[idx].count=1,并且hash[idx].timestamp=输入时间戳
- 如果hash[idx].count>0,然后
案例(1):如果i/p timestamp==hash[idx].timestamp,hash[count]++;
情况(2):如果i/p时间戳>hash[idx].timestamp,然后hash[idx][count=1hash[idx].timestamp=inputTimeStamp
情况(3)::如果i/p时间戳<hash[idx].count//旧请求,可以忽略。
现在,对于最后一秒、分钟、小时的任何查询:如上所述查找idx,只要时间戳在给定的秒/范围/分钟内与匹配,就继续以循环方式从idx迭代回来。
一个简单的时间戳列表怎么样?每次发出请求时,都会将当前时间戳附加到列表中。每次你想检查自己是否低于速率限制时,你首先删除超过1小时的时间戳,以防止堆栈溢出(呵呵),然后计算最后一秒、分钟或其他时间戳的数量。
这可以在Python中轻松完成:
import time
requestsTimestamps = []
def add_request():
requestsTimestamps.append(time.time())
def requestsCount(delayInSeconds):
requestsTimestamps = [t for t in requestsTimestamps if t >= time.time() - 3600]
return len([t for t in requestsTimestamps if t >= time.time() - delayInSeconds])
我想这是可以优化的,但你看到了这个想法。