并行HttpClient请求超时由于异步问题?



我正在使用System.Threading.Tasks.Parallel.ForEach同步并行运行一个方法。在该方法的最后,它需要发出几十个HTTPPOST请求,这些请求彼此不依赖。因为我是在。net Framework 4.6.2上,System.Net.Http.HttpClient是完全异步的,所以我使用Nito.AsyncEx.AsyncContext来避免死锁,形式为:

public static void MakeMultipleRequests(IEnumerable<MyClass> enumerable)
{
AsyncContext.Run(async () => await Task.WhenAll(enumerable.Select(async c => 
await getResultsFor(c).ConfigureAwait(false))));
}

getResultsFor(MyClass c)方法然后创建一个HttpRequestMessage并使用:

await httpClient.SendAsync(request);

然后解析响应,并在MyClass实例上设置相关字段。

我的理解是,同步线程将阻塞在AsyncContext.Run(...),而许多任务由AsyncContext拥有的单个AsyncContextThread异步执行。当它们全部完成时,同步线程将解除阻塞。

这对于几百个请求来说工作得很好,但是当它在五分钟内扩展到几千个请求时,一些请求开始从服务器返回HTTP 408 Request Timeout错误。我的日志显示,这些超时发生在发送请求最多的峰值负载时,并且超时发生在收到许多其他请求之后很久。

我认为问题在于任务是awaitHttpClient内的服务器握手,但它们不是以FIFO顺序继续,所以当它们继续握手时,握手已经过期。然而,我想不出任何方法来处理这个问题,除了使用System.Threading.SemaphoreSlim来强制一次只有一个任务可以await httpClient.SendAsync(...)

我的应用程序非常大,完全转换为异步是不可行的。

这不是在阻塞之前包装任务可以完成的事情。对于初学者来说,如果请求通过,您可能最终会使服务器崩溃。现在你在用核武器对付客户。在。net框架中,每个域有2个并发请求的限制是可以放宽的,但是如果你把它设置得太高,你可能会导致服务器崩溃。

你可以通过在管道中使用DataFlow块来解决这个问题,以固定的并行度执行请求,然后解析它们。假设你有一个名为MyPayload的类,在属性中有很多Items:

ServicePointManager.DefaultConnectionLimit = 1000;
var options=new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 10
};
var downloader=new TransformBlock<string,MyPayload>(async url=>{
var json=await _client.GetStringAsync(url);
var data=JsonConvert.DeserializeObject<MyPayload>(json);
return data;
},options);
var importer=new ActionBlock<MyPayload>(async data=>
{
var items=data.Items;

using(var connection=new SqlConnection(connectionString))
using(var bcp=new SqlBulkCopy(connection))
using(var reader=ObjectReader.Create(items))
{
bcp.DestinationTableName = destination;
connection.Open();
await bcp.WriteToServerAsync(reader);
}
});

downloader.LinkTo(importer,new DataflowLinkOptions { 
PropagateCompletion=true
});

我使用FastMember的ObjectReader将项目包装在DbDataReader中,DbDataReader可用于批量插入记录到数据库。

一旦你有了这个管道,你可以开始发布url到头部块,downloader:

foreach(var url in hugeList)
{
downloader.Post(url);
}
downloader.Complete();

一旦所有的url都被发布,你告诉donwloader完成并等待管道中的最后一个块完成:

await importer.Completion;

首先,Nito.AsyncEx.AsyncContext将在一个线程池线程上执行;要以上述方式避免死锁,需要Nito.AsyncEx.AsyncContextThread的实例,如文档中所述。

有两个可能的原因:

  • .NET Framework 4.6.2中System.Net.Http.HttpClient的bug
  • 问题中概述的继续优先级问题,其中单个请求没有足够及时地继续,因此超时。

正如这个答案及其注释中所描述的,从一个类似的问题来看,可能可以使用自定义TaskScheduler来处理优先级问题,但是使用信号量来限制并发请求的数量可能是最好的答案:

using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Nito.AsyncEx;
public class MyClass 
{
private static readonly AsyncContextThread asyncContextThread
= new AsyncContextThread();
private static readonly HttpClient httpClient = new HttpClient();
private static readonly SemaphoreSlim semaphore = new SemaphoreSlim(10);
public HttpRequestMessage Request { get; set; }
public HttpResponseMessage Response { get; private set; }

private async Task GetResponseAsync()
{
await semaphore.WaitAsync();
try
{
Response = await httpClient.SendAsync(Request);
}
finally
{
semaphore.Release();
}
}
public static void MakeMultipleRequests(IEnumerable<MyClass> enumerable)
{
Task.WaitAll(enumerable.Select(c =>
asyncContextThread.Factory.Run(() =>
c.GetResponseAsync())).ToArray());
}
}

编辑使用AsyncContextThread在非线程池线程上执行异步代码,如预期的那样。AsyncContext本身不会这样做

最新更新