Core controller,我有一个Action,它看起来像这样:
public async Task<ActionResult> SomeAction(int someId)
{
int[] itemIds = await _service.GetSomeItems(someId);
var model = new List<ItemDetail>(itemIds.Length);
foreach (int id in itemIds)
model.Add(await _service.GetItemDetails(id));
return View(model);
}
它工作,但相当慢。我想提高"someaction"的性能。通过生成多个并行任务:
public async Task<ActionResult> SomeAction(int someId)
{
int[] itemIds = await _service.GetSomeItems(someId);
var tasks = new List<Task>();
foreach (int id in itemIds)
tasks.Add(_service.GetItemDetails(id));
var model = await Task.WhenAll(tasks);
return View(model);
}
这样做效果更好,但我不确定这种方法是否是一个好的实践。通常是_service。GetSomeItems API返回20-30个元素,但有时可能返回100-200个元素。在我的例子中,_service
是第三方REST API,延迟100-150毫秒。
我的问题是-有多少任务可以我一次产卵?如果同时出现多个请求(每个请求将生成20-200个任务),我的代码会导致ThreadPool耗尽吗?我的方法通常是不好的做法吗?
我试图避免的是ThreadPool饥饿,这可能会在某些情况下导致死锁。
更新:_service.GetItemDetails(id)
-是第三方Web API,请求延迟为100-150 ms.
这不是一个好的做法。问题不在于可以创建多少任务,而在于其他服务器在阻塞或限制您之前将接受多少请求、网络开销和延迟、网络上的压力。你正在进行网络调用,这意味着你的CPU不需要做任何事情,它只是等待远程服务器的响应。
然后是服务器端的开销:每个请求打开一个连接,执行单个查询,序列化结果。如果服务器的代码写得不好,它可能会崩溃。如果负载平衡器配置得不好,所有1000个并发调用可能最终由同一台机器提供服务。这1000个请求可能会打开1000个相互阻塞的连接。
如果远程API允许,尝试一次请求多个项目。您只需要支付一次开销,服务器只需要执行一个查询。
否则,您可以尝试并发执行多个连接,但不能一次执行所有连接。通过限制并发执行的请求数量和可能的执行频率来限制请求要好得多。
在。net 6(下一个LTS版本,已经在生产环境中得到支持)中,你可以在有限的并行度下使用Parallel.ForEachAsync
:
ConcurrentQueue<Something> list=new ConcurrentQueue<Something>(1000);
var options=new ParallelOptions { MaxDegreeOfParallelism = 20};
await Parallel.ForEachAsync(itemIds,options,async id=>{
var result=_service.GetItemDetails(id);
var something=CreateSomething(result);
list.Add(something);
});
model.Items=list;
return View(model);
另一个选项是创建使用通道或TPL数据流块来创建并发处理项的处理管道,例如第一步生成id,下一步检索远程项,最后一步产生最终结果:
ChannelReader<int> GetSome(int someId,CancellationToken token=default)
{
var channel=Channel.CreateUnbounded<int>();
int[] itemIds = await _service.GetSomeItems(someId);
foreach(var id in itemIds)
{
if(token.IsCancellationRequested)
{
return;
}
channel.Writer.TryWrite(id);
}
return channel;
}
ChannelReader<Detail> GetDetails(ChannelReader<int> reader,int dop,
CancellationToken token=default)
{
var channel=Channel.CreateUnbounded<Detail>();
var writer=channel.Writer;
var options=new ParallelOptions {
MaxDegreeOfParallelism=dop,
CancellationToken=token
};
var worker=Parallel.ForEachAsync(reader.ReadAllAsync(token),
options,
async id=>{
try {
var details=_service.GetItemDetails(id);
await writer.WriteAsync(details);
}
catch(Exception exc)
{
//Handle the exception so we can keep processing
//other messages
}
});
worker.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel;
}
async Task<Model> CreateModel(ChannelReader<Detail> reader,...)
{
var allDetails=new List<Detail>(1000);
await(foreach var detail in reader.ReadAllAsync(token))
{
allDetails.Add(detail);
//Do other heavyweight things
}
var model=new Model {Details=allDetails,.....});
}
这些方法可以在管道中链接:
var chIds=GetSome(123);
var chDetails=GetDetails(chIds,20);
var model=await CreateModel(chDetails);
这是使用通道时的常见模式。在像Go这样的语言中,通道用于创建多步骤的处理管道。
将这些方法转换为扩展方法允许以流畅的方式创建管道:
static ChannelReader<Detail> GetDetails(this ChannelReader<int> reader,int dop,CancellationToken token=default)
static async Task<Model> CreateModel(this ChannelReader<Detail> reader,...)
var model= await GetSome(123);
.GetDetails(20)
.CreateModels();