封装速率限制API调用



我访问了一个API调用,该调用每秒接受最大的调用速率。如果超过速率,将引发异常。

我想把这个调用封装成一个抽象,它做必要的事情来将调用速率保持在限制之下。它就像一个网络路由器:处理多个呼叫,并将结果返回给关心呼叫率的正确呼叫者。目标是使调用代码尽可能不知道该限制。否则,代码中具有此调用的每个部分都必须封装到try-catch中!

例如:假设您可以从外部API调用一个可以添加2个数字的方法。此API每秒可调用5次。任何高于此值的值都将导致异常。

为了说明这个问题,限制调用速率的外部服务就像这个问题的答案中的服务:如何用Observable构建一个速率限制的API?

其他信息

由于您不希望每次从代码的任何部分调用此方法时都担心这个限制,因此您可以考虑设计一个可以调用的包装器方法,而不必担心速率限制。在内部,您关心限制,但在外部,您暴露了一个简单的异步方法。

它类似于网络服务器。它如何将正确的结果包返回给正确的客户?

多个调用者将调用此方法,并且他们将在到达时得到结果。这个抽象应该起到代理的作用。

我该怎么做?

我相信包装方法的公司应该像一样

public async Task<Results> MyMethod()

在方法内部,它将执行逻辑,可能使用反应扩展(Buffer)。我不知道。

但是怎么做呢?我的意思是,对这个方法的多次调用应该会将结果返回给正确的调用者。这可能吗?

有可用的速率限制库(请参阅Esendex的TokenBucket Github或Nuget)。

用法非常简单,此示例将轮询限制为1秒

// Create a token bucket with a capacity of 1 token that refills at a fixed interval of 1 token/sec.
ITokenBucket bucket = TokenBuckets.Construct()
  .WithCapacity(1)
  .WithFixedIntervalRefillStrategy(1, TimeSpan.FromSeconds(1))
  .Build();
// ...
while (true)
{
  // Consume a token from the token bucket.  If a token is not available this method will block until
  // the refill strategy adds one to the bucket.
  bucket.Consume(1);
  Poll();
}

我还需要为我的一个项目使其异步,我只是做了一个扩展方法:

public static class TokenBucketExtensions
{
    public static Task ConsumeAsync(this ITokenBucket tokenBucket)
    {
        return Task.Factory.StartNew(tokenBucket.Consume);
    }
}

使用它,您不需要抛出/捕获异常,编写包装器变得相当琐碎。

你应该做什么取决于你的目标和限制。我的假设:

  • 您希望在限速器生效时避免发出请求
  • 你无法预测一个特定的请求是否会被拒绝,或者如何才能再次允许另一个请求
  • 您不需要同时发出多个请求,并且当多个请求正在等待时,它们按哪个顺序完成并不重要

如果这些假设是有效的,您可以使用AsyncEx中的AsyncAutoResetEvent:在发出请求之前等待它的设置,在成功发出请求之后设置它,并在速率受限时在延迟之后设置它。

代码可以是这样的:

class RateLimitedWrapper<TException> where TException : Exception
{
    private readonly AsyncAutoResetEvent autoResetEvent = new AsyncAutoResetEvent(set: true);
    public async Task<T> Execute<T>(Func<Task<T>> func) 
    {
        while (true)
        {
            try
            {
                await autoResetEvent.WaitAsync();
                var result = await func();
                autoResetEvent.Set();
                return result;
            }
            catch (TException)
            {
                var ignored = Task.Delay(500).ContinueWith(_ => autoResetEvent.Set());
            }
        }
    }
}

用法:

public static Task<int> Add(int a, int b)
{
    return rateLimitedWrapper.Execute(() => rateLimitingCalculator.Add(a, b));
}

实现这一点的一个变体是确保调用之间的时间最短,如下所示:

private readonly Object syncLock = new Object();
private readonly TimeSpan minTimeout = TimeSpan.FromSeconds(5);
private volatile DateTime nextCallDate = DateTime.MinValue;
public async Task<Result> RequestData(...) {
    DateTime possibleCallDate = DateTime.Now;
    lock(syncLock) {
        // When is it possible to make the next call?
        if (nextCallDate > possibleCallDate) {
            possibleCallDate = nextCallDate;
        }
        nextCallDate = possibleCallDate + minTimeout;
    }
    TimeSpan waitingTime = possibleCallDate - DateTime.Now;
    if (waitingTime > TimeSpan.Zero) {
        await Task.Delay(waitingTime);
    }
    return await ... /* the actual call to API */ ...;
}

最新更新