在 JAX-RS 驱动的服务器中使用反应器时如何处理阻塞调用?



要处理HTTP请求,我们必须将阻塞调用(例如JDBC调用)作为基于Mono/Flux的进程的一部分。 我们目前的计划如下所示:

// I renamed getSomething to processJaxrsHttpRequest
CompletionStage<String> processJaxrsHttpRequest(String input) {
return Mono.just(input)
.map(in -> process(in))
.flatMap(str -> Mono.fromCallable(() -> jdbcCall(str)).subscribeOn(fixedScheduler))
.flatMap(str -> asyncHttpCall(str))
.flatMap(str -> Mono.fromCallable(() -> jdbcCall(str)).subscribeOn(fixedScheduler))
.toFuture();
}

其中fixedScheduler在 HTTP 请求中并发使用。

我们希望得到一些关于这种策略的反馈,以便在相当数量的通量中处理块调用。 当然,我们知道,如果我们的所有请求都通过这些阻塞调用流动,那么我们最好不要使用 actor(在公认的很好的处理 API 之外)。

更新:感谢bsideup的这个答案。 但是,我应该更具体地回答我的问题。

我的总体问题是如何有效地跨多个通量使用阻塞调用,因为这些通量可以大量创建/订阅。 我们尝试了建议的方法,但它会导致线程爆炸式增长并快速 OOM。 因此,我们正在考虑使用共享调度程序。 所以。。这是我的问题。

  1. 使用共享调度程序(fixedScheduler)在我描述的情况下你会建议吗? 如果没有,你会指出我的任何方向吗?
  2. 如果使用共享调度程序很好,这会是一个很好的实现吗:Schedulers.newParallel("blocking-scheduler", maxNumThreads)

更新2:只是在Schedulers#newParallel上挖了一个大块,并意识到这是行不通的,因为它"拒绝"了阻止呼叫。

真的很感谢任何提示!

虽然subscribeOn确实是处理阻塞呼叫的一种方式,并且您的使用是可以的,但您也可以使用publishOn
它将处理移动到提供的Scheduler,除非指定了其他publishOn

CompletionStage<String> getSomething(String input) {
return Mono.just(input)
.map(in -> process(in)) // process must be non-blocking, or go after publishOn
.publishOn(Schedulers.boundedElastic())
.map(::jdbcCall)
.flatMap(str -> asyncHttpCall(str))
.publishOn(Schedulers.boundedElastic())
.map(::jdbcCall)
.toFuture();
}

如您所见,您也可以继续使用异步调用。只要确保您没有阻止非阻塞调度程序(在该示例中,我在flatMap后再次使用publishOn,因为asyncHttpCall可能会从非阻塞调度程序完成)

最新更新