信号量很小,可以处理每个时间段的节流



我有一个客户的要求,要调用他们的API,但由于节流限制,我们一分钟只能调用100个API。我正在使用SemaphoreSlim来处理这个问题,这是我的代码。

async Task<List<IRestResponse>> GetAllResponses(List<string> locationApiCalls) 
{
var semaphoreSlim = new SemaphoreSlim(initialCount: 100, maxCount: 100);
var failedResponses = new ConcurrentBag<IReadOnlyCollection<IRestResponse>>();
var passedResponses = new ConcurrentBag<IReadOnlyCollection<IRestResponse>>();
var tasks = locationApiCalls.Select(async locationApiCall =>
{
await semaphoreSlim.WaitAsync();
try
{
var response = await RestApi.GetResponseAsync(locationApi);
if (response.IsSuccessful)
{
passedResponses.Add((IReadOnlyCollection<IRestResponse>)response);
}
else
{
failedResponses.Add((IReadOnlyCollection<IRestResponse>)response);
}
}
finally
{
semaphoreSlim.Release();
}
});
await Task.WhenAll(tasks);
var passedResponsesList = passedResponses.SelectMany(x => x).ToList();
}

然而,这条线路

var passedResponsesList = passedResponses.SelectMany(x => x).ToList();

从来没有被执行过,我也看到了很多失败的响应,我想我也必须在代码中的某个地方添加Task.Delay(1分钟(。

您需要跟踪执行前100个请求的时间。在下面的示例实现中,ConcurrentQueue<TimeSpan>记录了这100个请求中每一个请求的相对完成时间。通过从该队列中取出第一个(因此也是最早的(时间,您可以检查自100个请求以来已经过了多少时间。如果不到一分钟,那么下一个请求需要等待该分钟的剩余时间才能执行。

async Task<List<IRestResponse>> GetAllResponses(List<string> locationApiCalls)
{
var semaphoreSlim = new SemaphoreSlim(initialCount: 100, maxCount: 100);
var total = 0;
var stopwatch = Stopwatch.StartNew();
var completionTimes = new ConcurrentQueue<TimeSpan>();
var failedResponses = new ConcurrentBag<IReadOnlyCollection<IRestResponse>>();
var passedResponses = new ConcurrentBag<IReadOnlyCollection<IRestResponse>>();
var tasks = locationApiCalls.Select(async locationApiCall =>
{
await semaphoreSlim.WaitAsync();
if (Interlocked.Increment(ref total) > 100)
{
completionTimes.TryDequeue(out var earliest);
var elapsed = stopwatch.Elapsed - earliest;
var delay = TimeSpan.FromSeconds(60) - elapsed;
if (delay > TimeSpan.Zero)
await Task.Delay(delay);
}
try
{
var response = await RestApi.GetResponseAsync(locationApi);
if (response.IsSuccessful)
{
passedResponses.Add((IReadOnlyCollection<IRestResponse>)response);
}
else
{
failedResponses.Add((IReadOnlyCollection<IRestResponse>)response);
}
}
finally
{
completionTimes.Enqueue(stopwatch.Elapsed);
semaphoreSlim.Release();
}
});
await Task.WhenAll(tasks);
var passedResponsesList = passedResponses.SelectMany(x => x).ToList();
}

如果您从WinForms或WPF应用程序的UI线程调用此方法,请记住将ConfigureAwait(false)添加到其await语句中。

最新更新