使用Reactive MongoDB进行流式传输,耗时操作被取消



问题介于Project Reactor和Reactive MongoDB(Spring Data(之间。

当执行包含(按以下顺序(的流时:

  1. Reactive MongoDB上操作的方法,非常快速
  2. 花费30秒以上的方法

流被取消(查看下面的代码和日志(

@GetMapping("/test/{msg}")
public Mono<SomeObject> test(@PathVariable String msg) {
return repository.findByMessage(msg).log("1")
.map(someObj -> delaySeconds(someObj, 35)).log("2");
}

正如您所看到的,在30秒后,流被取消,但在另外5秒后(超时为35秒(,会执行事件onNext

12:59:18.556 [Thread-9] INFO  com.why.temp.TempController - Saved:SomeObject(id=5b604106ef301746a86665f3, message=WHY)
12:59:18.591 [http-nio-8080-exec-2] INFO  1 - | onSubscribe([Fuseable] MonoFlatMap.FlatMapMain)
12:59:18.592 [http-nio-8080-exec-2] INFO  2 - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
12:59:18.593 [http-nio-8080-exec-2] INFO  2 - | request(unbounded)
12:59:18.593 [http-nio-8080-exec-2] INFO  1 - | request(unbounded)
12:59:18.612 [Thread-8] INFO  1 - | onNext(SomeObject(id=5b604106ef301746a86665f3, message=WHY))
12:59:49.116 [http-nio-8080-exec-3] INFO  2 - | cancel()
12:59:49.117 [http-nio-8080-exec-3] INFO  1 - | cancel()
12:59:53.612 [Thread-8] INFO  2 - | onNext(SomeObject(id=5b604106ef301746a86665f3, message=WHY))

你能解释一下流被取消的原因吗?我该如何处理?

是否有应该增加的超时,或者我是否以错误的方式使用Project Reactor Stream API和MongoDB?

这是我的MongoDB配置

@Bean
public ReactiveMongoTemplate reactiveMongoTemplate() {
ConnectionString str = new ConnectionString(env.getMongoUri());
return new ReactiveMongoTemplate(MongoClients.create(str), str.getDatabase());
}

知道吗?如果你有类似的问题,请投赞成票。

解决方法很简单,但不那么优雅:

@GetMapping("/test/{msg}")
public Mono<SomeObject> test(@PathVariable String msg) {
SomeObj someObj = repository.findByMessage(msg).block();
return Mono.just(someObj).log("1")
.map(someObj -> delaySeconds(someObj, 35)).log("2");
}

我也有类似的问题,当反应式操作链花费了超过魔术般的30秒。在我的案例中,这是Spring MVC请求超时,下面是提示:

Spring Boot管理日志中的递归AsyncRequestTimeoutException

spring.mvc.async.request-timeout的最大故障值为30s。

我相信它会有所帮助:(。

干杯!

最新更新