GatewayFilter反应器机制



我是反应器新手,想了解以下代码之间的区别。

  1. 两者有何不同?(其余代码在末尾给出)
// Version A
override fun filter(exchange: ServerWebExchange, chain: GatewayFilterChain): Mono<Void> {
return sample()
.flatMap {
val newRequest = exchange.request.mutate()
.headers { httpHeaders -> httpHeaders.set("sample", it) }
.build()
chain.filter(exchange.mutate().request(newRequest).build())
}
.onErrorResume(SampleException::class.java) { ex -> exchange.mutateToError() }
.onErrorResume { exchange.mutateToError() }
}

// Version B
override fun filter(exchange: ServerWebExchange, chain: GatewayFilterChain): Mono<Void> {
return sample()
.onErrorResume(SampleException::class.java) { ex -> exchange.mutateToError() }
.onErrorResume { exchange.mutateToError() }
.flatMap {
val newRequest = exchange.request.mutate()
.headers { httpHeaders -> httpHeaders.set("sample", it) }
.build()
chain.filter(exchange.mutate().request(newRequest).build())
}
}

我理解版本B不工作,因为两个onErrorResume()返回Mono<Void>,而sample()返回Mono<String>。但除此之外,我想了解的是,对于版本A,即使在chain.filter(exchange.mutate().request(newRequest).build()期间出现任何类型的错误,两个onErrorResume()也会触发。处理链中的任何错误)。另一方面,对于版本B,两个onErrorResume()只有在sample()期间出现错误时才会触发。我的理解对吗?

  1. 如果是这样,我将如何去纠正版本B,因为我的意图只是"捕获";版本B需要从两个onErrorResume链中获取Mono<String>,而不是像版本a那样从所有进程链中获取sample()

相关代码:

class SampleException : RuntimeException()
class SampleConfig
class SampleGatewayFilter(
private val config: SampleConfig,
private val webClient: WebClient,
) : GatewayFilter {
override fun filter(exchange: ServerWebExchange, chain: GatewayFilterChain): Mono<Void> {
return sample()
.flatMap {
val newRequest = exchange.request.mutate()
.headers { httpHeaders -> httpHeaders.set("sample", it) }
.build()
chain.filter(exchange.mutate().request(newRequest).build())
}
.onErrorResume(SampleException::class.java) { ex -> exchange.mutateToError() }
.onErrorResume { exchange.mutateToError() }
}
private fun ServerWebExchange.mutateToError(): Mono<Void> {
val payload = """{"foo": "bar"}""".toByteArray(Charsets.UTF_8)
response.statusCode = HttpStatus.UNAUTHORIZED
response.headers.contentLength = payload.size.toLong()
response.headers.contentType = MediaType.APPLICATION_JSON
val dataBuffer = response.bufferFactory().wrap(payload)
return response.writeWith(Mono.just(dataBuffer))
}
private fun sample(): Mono<String> {
return webClient.get()
.uri("/api/v1/sample")
.retrieve()
.onStatus({ s -> s.is4xxClientError || s.is5xxServerError }, { Mono.empty() })
.bodyToMono<CommonApiResponse<String>>()
.onErrorResume { e ->
logger.warn(e) { "Exception in sample" }
Mono.error(e)
}.flatMap {
if (it.result == ResultType.SUCCESS) {
Mono.just(it.data)
} else {
Mono.error(SampleException())
}
}
}
}

我用flatMapMany()代替链接onErrorResume()flatMap()

// Version B
override fun filter(exchange: ServerWebExchange, chain: GatewayFilterChain): Mono<Void> {
return sample()
.flatMapMany(
{
val newRequest = exchange.request.mutate()
.headers { httpHeaders -> httpHeaders.set("sample", it) }
.build()
chain.filter(exchange.mutate().request(newRequest).build())
},
{ ex ->
when (ex) {
is SampleException -> exchange.mutateToError()
else -> exchange.mutateToError()
}
},
{ Mono.empty() }
).next()
}

不得不使用next(),因为我找不到插入" none "的方法。而不是"空的"为onComplete添加通量。否则,我会使用single()

相关内容

  • 没有找到相关文章

最新更新