Spring Integration-Manage 401 http出站适配器调用出错



我是spring集成的新手。

我有一个流,需要根据某些条件对其执行http或tcp调用。我关注的问题与http调用有关。调用的rest端点需要一个accessToken作为身份验证的头参数,该参数由具有两个方法getCurrentAccessToken()refreshAccessToken()的spring服务提供。我只想在currentAccessToken过期时调用方法refreshaccessToken

我想做的是在执行对rest api的调用时添加以下逻辑:

如果令牌过期,则其余端点返回401,我希望在流中截获此错误,并通过添加刷新的访问令牌重试请求。

@Bean
public IntegrationFlow clientIn(AbstractServerConnectionFactory server,
AbstractClientConnectionFactory client, LogService logService) {
return IntegrationFlows.from(Tcp.inboundAdapter(client)
.enrichHeaders(t -> t.headerFunction(IpHeaders.CONNECTION_ID, message -> this.client, true))
.log(msg -> "client: " + logService.log(msg))
.<byte[], Boolean>route(this::shouldForwardToHttp,
mapping -> mapping.subFlowMapping(true, sf -> sf
.enrichHeaders(t -> t.header("Content-Type", MimeTypeUtils.APPLICATION_JSON_VALUE))
.<byte[], RequestMessage>transform(this::buildRequestFromMessage)
.<RequestMessage, HttpEntity>transform(this::getHttpEntity)
.handle(Http.outboundGateway(restUrl).httpMethod(HttpMethod.POST)
.expectedResponseType(ResponseMessage.class))
.<ResponseMessage, byte[]>transform(p -> this.transformResponse(p))
.handle(Tcp.outboundAdapter(client))).subFlowMapping(false,
t -> t.handle(Tcp.outboundAdapter(server).retryInterval(1000))))
.get();
}
HttpEntity getHttpEntity(RequestMessage request) {
MultiValueMap<String, String> mv = new HttpHeaders();
mv.add("accessToken", tokenProvider.getCurrentAccessToken());
HttpEntity entity = new HttpEntity(request, mv);
return entity;
}

我尝试添加一个requestHandlerRetry建议并将其重定向到recoveryChannel,但我无法向调用方流返回一些内容,以便获得带有状态代码的响应,并使用新的accessToken重试调用。

你知道我该如何实现吗?

我认为您不需要重试建议,因为您肯定是通过捕获401异常并用刷新的令牌回调服务来模拟它的。听起来更像是递归。为了正确地实现它,我建议查看ExpressionEvaluatingRequestHandlerAdvice:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#message-处理人员建议链。是的,它类似于重试,也有failureChannel,但没有内置的重试,因为我们将模拟它在必要时一次又一次地调用同一个端点。

为了简化递归逻辑,我将把.handle(Http.outboundGateway(restUrl).httpMethod(HttpMethod.POST) .expectedResponseType(ResponseMessage.class))提取到一个单独的流中,并在主流中使用带有该流输入通道的gateway()

failureChannel子流应当将其消息重新路由回到网关流的输入。

该逻辑中最重要的部分是执行所有原始请求消息头,其中包括网关逻辑replyChannel所需的。

查看有关网关的更多文档:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#gateway.

ExpressionEvaluatingRequestHandlerAdvicefailureChannel发送消息时,它以MessageHandlingExpressionEvaluatingAdviceException作为有效载荷的ErrorMessage出现。导致故障并具有所有必需标头的消息位于getFailedMessage()属性中。所以,您获取该消息,请求新的令牌,并将其添加到基于原始消息的新消息的标头中。最后,您将这个新消息发送到IntegrationFlow的输入通道以进行HTTP请求。当一切正常时,HTTP调用的结果将从标头转发到所提到的replyChannel,从而转发到下一步的主流。

最新更新