使用 Polly 重试 ASP.NET 核心中的请求时出错'Stream was already consumed'



我有一个ASP.NET Core 3.1 Web API服务,它正在接收Web请求,对其进行一些操作,然后将其传递到后端服务并同步返回响应。它运行得很好,但我想为那些后端请求引入一些重试逻辑,以防出现一些问题。

我正在使用一个类型化的HttpClient,并试图使用Polly来实现重试逻辑:https://github.com/App-vNext/Polly/wiki/Polly-and-HttpClientFactory#using-polly与ihttpclientfactory

当后端服务工作时,一切似乎都很好,但不幸的是,每当我的后端返回类似500内部服务器错误的错误时,我就会得到以下异常:

Microsoft.AspNetCore.Diagnostics.DeveloperExceptionPageMiddleware: Error: An unhandled exception has occurred while executing the request.
System.Net.Http.HttpRequestException: An error occurred while sending the request.
---> System.InvalidOperationException: The stream was already consumed. It cannot be read again.
at System.Net.Http.StreamContent.PrepareContent()
at System.Net.Http.StreamContent.SerializeToStreamAsync(Stream stream, TransportContext context, CancellationToken cancellationToken)
at System.Net.Http.HttpContent.CopyToAsync(Stream stream, TransportContext context, CancellationToken cancellationToken)
at System.Net.Http.HttpConnection.SendRequestContentAsync(HttpRequestMessage request, HttpContentWriteStream stream, CancellationToken cancellationToken)
at System.Net.Http.HttpConnection.SendAsyncCore(HttpRequestMessage request, CancellationToken cancellationToken)
--- End of inner exception stack trace ---
at System.Net.Http.HttpConnection.SendAsyncCore(HttpRequestMessage request, CancellationToken cancellationToken)
at System.Net.Http.HttpConnectionPool.SendWithNtConnectionAuthAsync(HttpConnection connection, HttpRequestMessage request, Boolean doRequestAuth, CancellationToken cancellationToken)
at System.Net.Http.HttpConnectionPool.SendWithRetryAsync(HttpRequestMessage request, Boolean doRequestAuth, CancellationToken cancellationToken)
at System.Net.Http.RedirectHandler.SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
at System.Net.Http.DiagnosticsHandler.SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
at Microsoft.Extensions.Http.Logging.LoggingHttpMessageHandler.SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
at Polly.Retry.AsyncRetryEngine.ImplementationAsync[TResult](Func`3 action, Context context, CancellationToken cancellationToken, ExceptionPredicates shouldRetryExceptionPredicates, ResultPredicates`1 shouldRetryResultPredicates, Func`5 onRetryAsync, Int32 permittedRetryCount, IEnumerable`1 sleepDurationsEnumerable, Func`4 sleepDurationProvider, Boolean continueOnCapturedContext)
at Polly.AsyncPolicy`1.ExecuteAsync(Func`3 action, Context context, CancellationToken cancellationToken, Boolean continueOnCapturedContext)
at Microsoft.Extensions.Http.PolicyHttpMessageHandler.SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
at Microsoft.Extensions.Http.Logging.LoggingScopeHttpMessageHandler.SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
at System.Net.Http.HttpClient.FinishSendAsyncBuffered(Task`1 sendTask, HttpRequestMessage request, CancellationTokenSource cts, Boolean disposeCts)
at MyProject.MyController.Routing.HttpMessageRouter.SendRequestAndTrackTiming(Func`1 action, String destinationID) in /mnt/c/Users/myCode/source/repos/MyProject-GitLab/MyController/src/MyController/Routing/HttpMessageRouter.cs:line 59
at MyProject.MyController.Routing.HttpMessageRouter.SendNewRequest(IMessageWrapper`1 message) in /mnt/c/Users/myCode/source/repos/MyProject-GitLab/MyController/src/MyController/Routing/HttpMessageRouter.cs:line 33
at MyProject.MyController.Controllers.MyControllerController.Resource(String destinationId) in /mnt/c/Users/myCode/source/repos/MyProject-GitLab/MyController/src/MyController/Controllers/MyControllerController.cs:line 151
at Microsoft.AspNetCore.Mvc.Infrastructure.ActionMethodExecutor.TaskOfIActionResultExecutor.Execute(IActionResultTypeMapper mapper, ObjectMethodExecutor executor, Object controller, Object[] arguments)
at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.<InvokeActionMethodAsync>g__Awaited|12_0(ControllerActionInvoker invoker, ValueTask`1 actionResultValueTask)
at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.<InvokeNextActionFilterAsync>g__Awaited|10_0(ControllerActionInvoker invoker, Task lastTask, State next, Scope scope, Object state, Boolean isCompleted)
at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.Rethrow(ActionExecutedContextSealed context)
at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.Next(State& next, Scope& scope, Object& state, Boolean& isCompleted)
at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.<InvokeInnerFilterAsync>g__Awaited|13_0(ControllerActionInvoker invoker, Task lastTask, State next, Scope scope, Object state, Boolean isCompleted)
at Microsoft.AspNetCore.Mvc.Infrastructure.ResourceInvoker.<InvokeFilterPipelineAsync>g__Awaited|19_0(ResourceInvoker invoker, Task lastTask, State next, Scope scope, Object state, Boolean isCompleted)
at Microsoft.AspNetCore.Mvc.Infrastructure.ResourceInvoker.<InvokeAsync>g__Logged|17_1(ResourceInvoker invoker)
at Microsoft.AspNetCore.Routing.EndpointMiddleware.<Invoke>g__AwaitRequestTask|6_0(Endpoint endpoint, Task requestTask, ILogger logger)
at Microsoft.AspNetCore.Authorization.AuthorizationMiddleware.Invoke(HttpContext context)
at Microsoft.AspNetCore.Authentication.AuthenticationMiddleware.Invoke(HttpContext context)
at Prometheus.HttpMetrics.HttpRequestDurationMiddleware.Invoke(HttpContext context)
at Prometheus.HttpMetrics.HttpRequestCountMiddleware.Invoke(HttpContext context)
at Prometheus.HttpMetrics.HttpInProgressMiddleware.Invoke(HttpContext context)
at Microsoft.AspNetCore.Diagnostics.DeveloperExceptionPageMiddleware.Invoke(HttpContext context)

我必须发送请求的代码如下。第一个方法实际上只是使用了一个委托,这样我就可以透明地添加一些围绕类型化HttpClient调用收集的度量。键入的HttpClient在以下代码中被称为RoutingClient

public async Task<IActionResult> SendNewRequest(IMessageWrapper<HttpRequestMessage> message)
{
HttpResponseMessage destinationResponse = await SendRequestAndTrackTiming(() => _client.SendAsync(message.Message), message.DestinationID);
return CreateSerializeableResponseMessage(destinationResponse);
}
private ResponseMessageResult CreateSerializeableResponseMessage(HttpResponseMessage httpResponse)
{
ResponseMessageResult responseMessage = new ResponseMessageResult(httpResponse);
IOutputFormatter[] formattersList = { new HttpResponseMessageOutputFormatter() };
FormatterCollection<IOutputFormatter> formattersCollection = new FormatterCollection<IOutputFormatter>(formattersList);
responseMessage.Formatters = formattersCollection;
return responseMessage;
}
private async Task<HttpResponseMessage> SendRequestAndTrackTiming(Func<Task<HttpResponseMessage>> action, string destinationID)
{
HttpResponseMessage response = null;
Stopwatch stopwatch = new Stopwatch();
try
{
stopwatch.Start();
response = await action();
return response;
}
finally
{
stopwatch.Stop();
HttpStatusCode statusCode = (response != null) ? response.StatusCode : HttpStatusCode.InternalServerError;
_routedMessageMetricTracker.Histogram.Observe(destinationID, statusCode, stopwatch.Elapsed.TotalSeconds);
}
}

这是我在Startup.ConfigureServices((中的代码(实际上这是在我定义的扩展方法中(:

public static IServiceCollection AddRoutingClient(this IServiceCollection services, RoutingSettings routingSettings)
{
List<TimeSpan> retryTimeSpans = new List<TimeSpan>();
// routingSettings.RetrySeconds is just an array of double values.
foreach (double retrySeconds in routingSettings.RetrySeconds)
{
if (retrySeconds >= 0) retryTimeSpans.Add(TimeSpan.FromSeconds(retrySeconds));
}
services.AddHttpClient<IRoutingClient, RoutingClient>()
.AddPolicyHandler((services, request) => HttpPolicyExtensions.HandleTransientHttpError()
.WaitAndRetryAsync(retryTimeSpans, onRetry: (outcome, timespan, retryAttempt, context) =>
{
services.GetService<ILogger<RoutingClient>>()?
.LogWarning($"Delaying for {timespan.TotalMilliseconds}ms, then making retry {retryAttempt}.");
}
));
return services;
}

我真的很想用波利的方法,因为它看起来很干净,但我不确定我在这里做错了什么。我一定做错了什么,因为我认为这是Polly的一个非常常见的用例,应该进行处理。

事实证明,我的问题不是由处理上述响应的代码引起的。相反,它实际上是由我的代码操纵请求引起的。

我通过HttpContext.GetHttpRequestMessage()方法将传入请求作为HttpRequestMessage对象读取,并尝试重用同一对象,通过调用我键入的HttpClient将其传递给后端服务。然而,该请求的内容流被读取一次,因此我不得不复制该HttpRequestMessage,如对另一篇帖子的回答所述:https://stackoverflow.com/a/34049029/1221718

下面是这个答案的代码的一个稍微详细一点的版本:

private static async Task<HttpRequestMessage> CloneHttpRequestMessageAsync(HttpRequestMessage request)
{
HttpRequestMessage copyOfRequest = new HttpRequestMessage(request.Method, request.RequestUri);
// Copy the request's content (via a MemoryStream) into the cloned object
var ms = new MemoryStream();
if (request.Content != null)
{
await request.Content.CopyToAsync(ms).ConfigureAwait(false);
ms.Position = 0;
copyOfRequest.Content = new StreamContent(ms);
// Copy the content headers
if (request.Content.Headers != null)
{
foreach (var h in request.Content.Headers)
{
copyOfRequest.Content.Headers.Add(h.Key, h.Value);
}
}
}
copyOfRequest.Version = request.Version;
foreach (KeyValuePair<string, object> prop in request.Properties)
{
copyOfRequest.Properties.Add(prop);
}
foreach (KeyValuePair<string, IEnumerable<string>> header in request.Headers)
{
copyOfRequest.Headers.TryAddWithoutValidation(header.Key, header.Value);
}
return copyOfRequest;
}

这个问题的另一个解决方案是在请求流上启用缓冲。这是通过调用HttpRequest的扩展方法EnableBuffering()来完成的,例如

public async Task InvokeAsync(HttpContext context, RequestDelegate next)
{
context.Request.EnableBuffering();
...

参见https://devblogs.microsoft.com/dotnet/re-reading-asp-net-core-request-bodies-with-enablebuffering/了解更多详细信息。

相关内容

最新更新