我是反应器新手,想了解以下代码之间的区别。
- 两者有何不同?(其余代码在末尾给出)
// Version A
override fun filter(exchange: ServerWebExchange, chain: GatewayFilterChain): Mono<Void> {
return sample()
.flatMap {
val newRequest = exchange.request.mutate()
.headers { httpHeaders -> httpHeaders.set("sample", it) }
.build()
chain.filter(exchange.mutate().request(newRequest).build())
}
.onErrorResume(SampleException::class.java) { ex -> exchange.mutateToError() }
.onErrorResume { exchange.mutateToError() }
}
和
// Version B
override fun filter(exchange: ServerWebExchange, chain: GatewayFilterChain): Mono<Void> {
return sample()
.onErrorResume(SampleException::class.java) { ex -> exchange.mutateToError() }
.onErrorResume { exchange.mutateToError() }
.flatMap {
val newRequest = exchange.request.mutate()
.headers { httpHeaders -> httpHeaders.set("sample", it) }
.build()
chain.filter(exchange.mutate().request(newRequest).build())
}
}
我理解版本B不工作,因为两个onErrorResume()
返回Mono<Void>
,而sample()
返回Mono<String>
。但除此之外,我想了解的是,对于版本A,即使在chain.filter(exchange.mutate().request(newRequest).build()
期间出现任何类型的错误,两个onErrorResume()
也会触发。处理链中的任何错误)。另一方面,对于版本B,两个onErrorResume()
只有在sample()
期间出现错误时才会触发。我的理解对吗?
- 如果是这样,我将如何去纠正版本B,因为我的意图只是"捕获";版本B需要从两个
onErrorResume
链中获取Mono<String>
,而不是像版本a那样从所有进程链中获取sample()
。
相关代码:
class SampleException : RuntimeException()
class SampleConfig
class SampleGatewayFilter(
private val config: SampleConfig,
private val webClient: WebClient,
) : GatewayFilter {
override fun filter(exchange: ServerWebExchange, chain: GatewayFilterChain): Mono<Void> {
return sample()
.flatMap {
val newRequest = exchange.request.mutate()
.headers { httpHeaders -> httpHeaders.set("sample", it) }
.build()
chain.filter(exchange.mutate().request(newRequest).build())
}
.onErrorResume(SampleException::class.java) { ex -> exchange.mutateToError() }
.onErrorResume { exchange.mutateToError() }
}
private fun ServerWebExchange.mutateToError(): Mono<Void> {
val payload = """{"foo": "bar"}""".toByteArray(Charsets.UTF_8)
response.statusCode = HttpStatus.UNAUTHORIZED
response.headers.contentLength = payload.size.toLong()
response.headers.contentType = MediaType.APPLICATION_JSON
val dataBuffer = response.bufferFactory().wrap(payload)
return response.writeWith(Mono.just(dataBuffer))
}
private fun sample(): Mono<String> {
return webClient.get()
.uri("/api/v1/sample")
.retrieve()
.onStatus({ s -> s.is4xxClientError || s.is5xxServerError }, { Mono.empty() })
.bodyToMono<CommonApiResponse<String>>()
.onErrorResume { e ->
logger.warn(e) { "Exception in sample" }
Mono.error(e)
}.flatMap {
if (it.result == ResultType.SUCCESS) {
Mono.just(it.data)
} else {
Mono.error(SampleException())
}
}
}
}
我用flatMapMany()
代替链接onErrorResume()
和flatMap()
。
// Version B
override fun filter(exchange: ServerWebExchange, chain: GatewayFilterChain): Mono<Void> {
return sample()
.flatMapMany(
{
val newRequest = exchange.request.mutate()
.headers { httpHeaders -> httpHeaders.set("sample", it) }
.build()
chain.filter(exchange.mutate().request(newRequest).build())
},
{ ex ->
when (ex) {
is SampleException -> exchange.mutateToError()
else -> exchange.mutateToError()
}
},
{ Mono.empty() }
).next()
}
不得不使用next()
,因为我找不到插入" none "的方法。而不是"空的"为onComplete添加通量。否则,我会使用single()
。