Spring Cloud Gateway: Post Filter Web Client Request



我们使用Spring Cloud Gateway是为了将请求路由到多个底层服务。对这些底层服务的调用将是连续的,并且可能会相互馈送(来自一个服务的响应将用于对下一个服务的请求)。当我们需要在主请求之前顺序地发出这些请求时,我们有一个工作解决方案,但是在主请求之后,我们在将一个代理请求的响应馈送到下一个请求时遇到问题。

我们计划将响应从一个请求提供给下一个请求的方式是使用GatewayFilter中的WebClient发出请求,并将响应字符串存储在交换器的属性存储中。然后,在下一个代理请求期间,我们提供一个属性名,可选地从中提取请求体。当使用"pre"时效果很好。过滤器,因为第一个代理请求是在第二个请求构建和执行之前构建、执行和缓存响应的,所以属性链按预期工作。当使用& post"过滤器。在post代理中,web客户端请求都是在后续请求完成之前构建的。因此,属性存储永远不会有来自前一个请求的响应,这意味着下一个请求不能按预期工作,因为它没有有效的请求体。

我的理解是,调用chain.filter(exchange).then(Mono.fromRunnable{ ... })将导致.then逻辑只有在先前的过滤器完全完成后才执行。但事实似乎并非如此。在其他类型的过滤器中,如日志、响应操作等,post过滤器按正确的顺序执行,但在创建WebClient时,它们似乎不是这样。

有没有人对如何实现这种期望的行为有任何想法?

预代理过滤代码(工作):

class PreProxyGatewayFilterFactory: AbstractGatewayFilterFactory<PreProxyGatewayFilterFactory.Params>(Params::class.java) {
override fun apply(params: Params): GatewayFilter {
return OrderedGatewayFilter(
{ exchange, chain ->
ServerWebExchangeUtils.cacheRequestBody(exchange){
val cachedExchange = exchange.mutate().request(it).build()
executeRequest(cachedExchange, params)
.map { response ->
val body = response.body.toString()
cacheResponse(
response.body.toString(),
params.cachedResponseBodyAttributeName,
cachedExchange
)
}
.flatMap(chain::filter)
}
}, params.order)
}
private fun cacheResponse(response: String, attributeName: String?, exchange: ServerWebExchange): ServerWebExchange{
if(!attributeName.isNullOrBlank()){
exchange.attributes[attributeName] = response
}
return exchange
}
private fun executeRequest(exchange: ServerWebExchange, params: Params): Mono<ResponseEntity<String>>{
val request = when(exchange.request.method){
HttpMethod.PUT -> WebClient.create().put().uri(params.proxyPath).body(createProxyRequestBody(exchange, params.cachedRequestBodyAttributeName))
HttpMethod.POST -> WebClient.create().post().uri(params.proxyPath).body(createProxyRequestBody(exchange, params.cachedRequestBodyAttributeName))
HttpMethod.GET -> WebClient.create().get().uri(params.proxyPath)
HttpMethod.DELETE -> WebClient.create().delete().uri(params.proxyPath)
else -> throw Exception("Invalid request method passed in to the proxy filter")
}
return request.headers { headers ->
headers.addAll(exchange.request.headers)
headers.remove(CONTENT_LENGTH)
}
.exchange()
.flatMap{ response ->
response.toEntity(String::class.java)
}
}
private fun createProxyRequestBody(exchange: ServerWebExchange, attributeName: String?): BodyInserter<out Flux<out Any>, ReactiveHttpOutputMessage> {
val cachedBody = attributeName?.let { attrName ->
exchange.getAttributeOrDefault<String>(attrName, "null")
} ?: "null"
return if(cachedBody != "null"){
BodyInserters.fromPublisher(Flux.just(cachedBody), String::class.java)
} else {
BodyInserters.fromDataBuffers(exchange.request.body)
}
}
data class Params(
val proxyPath: String = "",
val cachedRequestBodyAttributeName: String? = null,
val cachedResponseBodyAttributeName: String? = null,
val order: Int = 0
)
}

后代理过滤代码(不工作)

class PostProxyGatewayFilterFactory: AbstractGatewayFilterFactory<PostProxyGatewayFilterFactory.Params>(Params::class.java) {
override fun apply(params: Params): GatewayFilter {
return OrderedGatewayFilter(
{ exchange, chain ->
ServerWebExchangeUtils.cacheRequestBody(exchange){
val cachedExchange = exchange.mutate().request(it).build()
//Currently using a cached body does not work in post proxy
chain.filter(cachedExchange).then( Mono.fromRunnable{
executeRequest(cachedExchange, params)
.map { response ->
cacheResponse(
response.body.toString(),
params.cachedResponseBodyAttributeName,
cachedExchange
)
}
.flatMap {
Mono.empty<Void>()
}
})
}
}, params.order)
}
private fun cacheResponse(response: String, attributeName: String?, exchange: ServerWebExchange): ServerWebExchange{
if(!attributeName.isNullOrBlank()){
exchange.attributes[attributeName] = response
}
return exchange
}
private fun executeRequest(exchange: ServerWebExchange, params: Params): Mono<ResponseEntity<String>>{
val request = when(exchange.request.method){
HttpMethod.PUT -> WebClient.create().put().uri(params.proxyPath).body(createProxyRequestBody(exchange, params.cachedRequestBodyAttributeName))
HttpMethod.POST -> WebClient.create().post().uri(params.proxyPath).body(createProxyRequestBody(exchange, params.cachedRequestBodyAttributeName))
HttpMethod.GET -> WebClient.create().get().uri(params.proxyPath)
HttpMethod.DELETE -> WebClient.create().delete().uri(params.proxyPath)
else -> throw Exception("Invalid request method passed in to the proxy filter")
}
return request.headers { headers ->
headers.addAll(exchange.request.headers)
headers.remove(CONTENT_LENGTH)
}
.exchange()
.flatMap{ response ->
response.toEntity(String::class.java)
}
}
private fun createProxyRequestBody(exchange: ServerWebExchange, attributeName: String?): BodyInserter<out Flux<out Any>, ReactiveHttpOutputMessage> {
val cachedBody = attributeName?.let { attrName ->
exchange.getAttributeOrDefault<String>(attrName, "null")
} ?: "null"
return if(cachedBody != "null"){
BodyInserters.fromPublisher(Flux.just(cachedBody), String::class.java)
} else {
BodyInserters.fromDataBuffers(exchange.request.body)
}
}

data class Params(
val proxyPath: String = "",
val cachedRequestBodyAttributeName: String? = null,
val cachedResponseBodyAttributeName: String? = null,
val order: Int = 0
)
}

终于能够从属性中获得post filter代理的请求体的工作解决方案。这是一个相对简单的解决方案,我只是找不到答案。不用chain.filter(exchange).then(Mono.fromRunnable { ...execute proxy request...}),我只需要用chain.filter(exchange).then(Mono.defer { ...execute proxy request...})

相关内容

  • 没有找到相关文章

最新更新