使用循环限制并发异步请求



我目前正在向Web API发出大量请求。我试图async这个过程,以便我可以在合理的时间内这样做,但是我无法限制连接,以便每秒发送的请求不超过10个。我正在使用信号量进行限制,但我不完全确定它在这种情况下如何工作,因为我有一个嵌套循环。

我基本上得到了一个模型列表,每个模型都有一个日期列表。我需要在模型内提出每一天的要求。天数可以从1到大约5099%时间只会是1。所以我想async每个模型,因为大约有3000个模型,但我想async需要完成的天数。我需要保持在或低于10个请求/秒,所以我认为最好的方法是在整个操作中将请求限制设置为 10。有没有一个地方可以放置一个信号量来限制与整个链的连接?

每个单独的请求还必须对2不同的数据片段发出两个请求,并且此 API 目前不支持任何类型的批处理。

我对 c# 有点陌生,对async

非常陌生,对WebRequests/HttpClient也很陌生,所以任何帮助都值得赞赏。我试图在这里添加所有相关代码。如果您需要其他任何东西,请告诉我。

public static async Task GetWeatherDataAsync(List<Model> models)
{
SemaphoreSlim semaphore = new SemaphoreSlim(10);
var taskList = new List<Task<ComparisonModel>>();
foreach (var x in models)
{
await semaphore.WaitAsync();
taskList.Add(CompDaysAsync(x));
}
try
{
await Task.WhenAll(taskList.ToArray());
}
catch (Exception e) { }
finally
{
semaphore.Release();
}
}
public static async Task<Models> CompDaysAsync(Model model)
{
var httpClient = new HttpClient();
httpClient.DefaultRequestHeaders.Authorization = new 
Headers.AuthenticationHeaderValue("Token","xxxxxxxx");
httpClient.Timeout = TimeSpan.FromMinutes(5);
var taskList = new List<Task<Models.DateTemp>>();
foreach (var item in model.list)
{
taskList.Add(WeatherAPI.GetResponseForDayAsync(item, 
httpClient, Latitude, Longitude));
}
httpClient.Dispose();
try
{
await Task.WhenAll(taskList.ToArray());
}
catch (Exception e) { }
return model;
}
public static async Task<DateTemp> GetResponseForDayAsync(DateTemp date, HttpClient httpClient, decimal? Latitude, decimal? Longitude)
{
var response = await httpClient.GetStreamAsync(request1);
StreamReader myStreamReader = new StreamReader(response);
string responseData = myStreamReader.ReadToEnd();
double[] data = new double[2];
if (responseData != "[[null, null]]")
{
data = Array.ConvertAll(responseData.Replace("[", "").Replace("]", "").Split(','), double.Parse);
}
else { data = null; };
double precipData = 0;
var response2 = await httpClient.GetStreamAsync(request2);
StreamReader myStreamReader2 = new StreamReader(response2);
string responseData2 = myStreamReader2.ReadToEnd();
if (responseData2 != null && responseData2 != "[null]" && responseData2 != "[0.0]")
{
precipData = double.Parse(responseData2.Replace("[", "").Replace("]", ""));
}
date.Precip = precipData;
if (data != null)
{
date.minTemp = data[0];
date.maxTemp = data[1];
}
return date;
}

我认为您完全不明白SemaphoreSlim做什么。

信号量是基于
  1. 方法级的局部变量,因此每个GetWeatherDataAsync方法调用都会生成对 API 的10调用,而无需等待其他客户端。
  2. 此外,如果models.Count > 10,您的代码将死锁,因为您在每次迭代中等待信号量,这些请求都会被堆叠,并且11th您的线程将永远挂起,因为您不会发布信号量:

    var semaphore = new SemaphoreSlim(10);
    foreach (var item in Enumerable.Range(0, 15))
    {
    // will stop after 9
    await semaphore.WaitAsync();
    Console.WriteLine(item);
    }
    

您真正需要做的是将信号量移动到实例级别(甚至是带有static关键字的类型级别),并在GetWeatherDataAsync等待它,然后将Release放入finally块中。

至于Parallel.Foreach- 你不应该在这种情况下使用它,因为它不知道async方法(它是在async/await之前引入的),而且你的方法看起来不像是 CPU 密集型的。

最新更新