对API的并发调用,保持成功的结果并忽略错误



我编写了一个方法来同时处理十分之一的API调用(请参阅下面的代码(。

问题是API的服务器同时承受大约20个调用,但除此之外,它开始为一些调用返回500个错误
当出现这种错误时,所有的任务都会以某种方式"取消";或";忘记";,不仅是返回500的调用,因为生成的metadatas集合为空
我试图用try/catch包围Task.WaitAll,但这里没有抛出异常。这并不奇怪,因为TryGetMetadataAsync都包含在try/catch中。

我只想保留成功呼叫的结果,忘记错误呼叫
知道我遗漏了什么吗?也许取消代币有问题?

FWIW:api.GetMetadataWithHttpInfoAsync是一个开放的API生成器客户端(从Swagger文件生成(

public async Task<IEnumerable<Metadata>> GetMetadataAsync(IEnumerable<string> dataRefs,
string? userJwt, CancellationToken cancellationToken)
{
var userId = GetUserIdFromJwt(userJwt);
var tasks = dataRefs
.Where(dataRef => !string.IsNullOrEmpty(dataRef))
.Select(dataRef => TryGetMetadataAsync(dataRef, userId, cancellationToken))
.ToArray();
// Load data concurrently and wait for all results:
Task.WaitAll(tasks, cancellationToken);
var metadatas = new List<Metadata>();
foreach (var task in tasks)
{
var metadata = await task;
if (metadata != null) { metadatas.Add(metadata); }
}
return metadatas;
}
private async Task<Metadata?> TryGetMetadataAsync(string dataRef, string userId,
CancellationToken cancellationToken)
{
try
{
if (cancellationToken.IsCancellationRequested) { return null; }
// Add a custom timeout to 30 seconds
// (don't wait for potential TaskCanceledException that may be due
// to 500 errors):
using var timeoutCancellationTokenSource = new CancellationTokenSource(
TimeSpan.FromSeconds(30));
var id = GetIdFromDataRef(dataRef);
if (id == null) { return null; }
var metadataResponse = await api.GetMetadataWithHttpInfoAsync(id, userId,
timeoutCancellationTokenSource.Token);
if (metadataResponse.StatusCode == HttpStatusCode.OK
&& metadataResponse.Data != null)
{
// Just a simple object conversion here...
return await ConvertToMetadataAsync(dataRef, metadataResponse.Data);
}
else
{
// Error management removed for brevity...
}
}
catch (ApiException apiEx)
{
// Exception specific management removed for brevity...
}
catch (Exception ex)
{
// Exception management removed for brevity...
}
return null;
}

感谢@TheodorZoulias和@JohnWu的评论,解决方案是使用Task.WhenAllContinueWith的组合(事实上,可能与"有可能从任务中获得成功的结果吗?当其中一个任务失败时,WhatAll?"重复(:

下面是一个工作代码(将输入分割成块以保持API的响应能力(:

public new async Task<IEnumerable<Metadata>> GetMetadataAsync(IEnumerable<string> dataRefs, string? userJwt, CancellationToken cancellationToken)
{
var userId = GetUserIdFromJwt(userJwt);
var metadatas = new ConcurrentBag<Metadata>();
// Chunk size to split input sequence:
const int chunkSize = 13;
// Split dataRefs in chunks:
var drChunks = dataRefs.Partition(chunkSize).ToList();
foreach (var chunk in drChunks)
{
var tasks = chunk
.Where(partDataRef => !string.IsNullOrEmpty(partDataRef))
.Select(partDataRef => TryGetMetadataAsync(partDataRef, userId, cancellationToken))
.ToArray();
try
{
// Use a ContinueWith to keep successful results and ignore errors
await Task.WhenAll(tasks).ContinueWith(t =>
{
// Retrieve successful calls results:
var chunkMetadatas = tasks
.Where(t => t.Status == TaskStatus.RanToCompletion)
.Where(t => t.Result != null)
.Select(t => t.Result)
.Cast<Metadata>()
.ToList();
foreach (var metadata in chunkMetadatas) { metadatas.Add(metadata); }
// Manage faulted or canceled tasks:
var aggregateExceptions = tasks
.Where(t => t.IsFaulted)
.Where(t => t.Exception != null)
.Select(t => t.Exception) // The Exception is of type AggregateException
.Cast<AggregateException>()
.ToArray();
var exceptions = new AggregateException(aggregateExceptions)
.Flatten()
.InnerExceptions
.ToList(); // Flatten the hierarchy of AggregateExceptions
if (exceptions.Count > 0)
{
// Logging...
}
else if (t.IsCanceled)
{
// No exceptions and at least one task was canceled
// Logging...
}
}, cancellationToken, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}
catch (Exception ex)
{
// Logging...
}
}
return metadatas;
}

尝试使用await Task.WhenAll代替Task.WaitAll像这样:

var tasks = dataRefs
.Where(dataRef => !string.IsNullOrEmpty(dataRef))
.Select(dataRef => TryGetMetadataAsync(dataRef, userId, cancellationToken))
.ToArray();
await Task.WhenAll(tasks);
var metadatas = new List<Metadata>();
foreach (var task in tasks)
{
var metadata = await task;
if (metadata != null) { metadatas.Add(metadata); }
}

可能Task.WaitAll只是不继续为TryGetMetadataAsync调用生成的状态机

最新更新