C#没有等待执行所有任务



我试图同时执行多个对Pi Number API的请求。主要的问题是,尽管有'Task.WWhenAll(ExecuteRequests(((.Wait((;'行,它并没有完成所有任务。它应该执行50个请求,并将结果添加到pi Dictionary中,但在执行代码后,字典大约有44~46个项。我试图添加一个"线程池验证时可用的线程",这样我就可以保证我有足够的线程,但什么都没有改变。

另一个问题是,有时当我运行代码时,我会出现一个错误,说我试图将一个已经添加的密钥添加到决策中,但lock语句不应该保证不会发生这个错误?

const int TotalRequests = 50;
static int requestsCount = 0;
static Dictionary<int, string> pi = new();
static readonly object lockState = new();
static void Main(string[] args)
{
var timer = new Stopwatch();
timer.Start();
Task.WhenAll(ExecuteRequests()).Wait();
timer.Stop();
foreach (var item in pi.OrderBy(x => x.Key))
Console.Write(item.Value);
Console.WriteLine($"nn{timer.ElapsedMilliseconds}ms");
Console.WriteLine($"n{pi.Count} items");
}
static List<Task> ExecuteRequests()
{
var tasks = new List<Task>();
for (int i = 0; i < TotalRequests; i++)
{
ThreadPool.GetAvailableThreads(out int workerThreads, out int completionPortThreads);
while (workerThreads < 1)
{
ThreadPool.GetAvailableThreads(out workerThreads, out completionPortThreads);
Thread.Sleep(100);
}
tasks.Add(Task.Run(async () =>
{
var currentRequestId = 0;
lock (lockState)
currentRequestId = requestsCount++;
var httpClient = new HttpClient();
var result = await httpClient.GetAsync($"https://api.pi.delivery/v1/pi?start={currentRequestId * 1000}&numberOfDigits=1000");
if (result.StatusCode == System.Net.HttpStatusCode.OK)
{
var json = await result.Content.ReadAsStringAsync();
var content = JsonSerializer.Deserialize<JsonObject>(json)!["content"]!.ToString();
//var content = (await JsonSerializer.DeserializeAsync<JsonObject>(new MemoryStream(System.Text.Encoding.UTF8.GetBytes(json)!)!)!)!["content"]!.ToString();
pi.Add(currentRequestId, content);
}
}));
}
return tasks;
}

只有一个问题-您只打开了代码的一个部分,该部分与线程有问题:

lock (lockState)
currentRequestId = requestsCount++;

但是,还有另一个:

pi.Add(currentRequestId, content);

这个问题与词典理念有关——有很多读者,只有一个作者。所以,您看到了有异常的情况,如果您编写try-catch,您将看到AggregateException,它几乎在所有情况下都意味着线程问题,所以,您需要这样做:

lock (lockState)
pi.Add(currentRequestId, content);

正如@AlexeiLevenkov所提到的,我在决策操作周围放了一个锁语句,效果很好。

tasks.Add(Task.Run(async () =>
{
var currentRequestId = 0;
lock (lockState)
currentRequestId = requestsCount++;
var httpClient = new HttpClient();
var result = await httpClient.GetAsync($"https://api.pi.delivery/v1/pi?start={currentRequestId * 1000}&numberOfDigits=1000");
if (result.StatusCode == System.Net.HttpStatusCode.OK)
{
var json = await result.Content.ReadAsStringAsync();
var content = JsonSerializer.Deserialize<JsonObject>(json)!["content"]!.ToString();
//var content = (await JsonSerializer.DeserializeAsync<JsonObject>(new MemoryStream(System.Text.Encoding.UTF8.GetBytes(json)!)!)!)!["content"]!.ToString();
lock (lockState)
pi.Add(currentRequestId, content);
}
}));

我不是直接回答这个问题,只是建议您可以使用Microsoft的Reactive Framework(又名Rx(-NuGetSystem.Reactive并添加using System.Reactive.Linq;,然后您就可以这样做了:

static void Main(string[] args)
{
var timer = new Stopwatch();
timer.Start();
(int currentRequestId, string content)[] results = ExecuteRequests(50).ToArray().Wait()
timer.Stop();
foreach (var item in results.OrderBy(x => x.currentRequestId))
Console.Write(item.content);
Console.WriteLine($"nn{timer.ElapsedMilliseconds}ms");
Console.WriteLine($"n{results.Count()} items");
}
static IObservable<(int currentRequestId, string content)> ExecuteRequests(int totalRequests) =>
Observable
.Defer(() =>
from currentRequestId in Observable.Range(0, totalRequests)
from content in Observable.Using(() => new HttpClient(), hc =>
from result in Observable.FromAsync(() => hc.GetAsync($"https://api.pi.delivery/v1/pi?start={currentRequestId * 1000}&numberOfDigits=1000"))
where result.StatusCode == System.Net.HttpStatusCode.OK
from json in Observable.FromAsync(() => result.Content.ReadAsStringAsync())
select JsonSerializer.Deserialize<JsonObject>(json)!["content"]!.ToString())
select new
{
currentRequestId,
content,
});

最新更新