了解 Spring 的 Web 响应式框架



我目前正在开发一个带有SpringBoot 2、netty 和jOOQ上的spring-boot-starter-webflux的应用程序。
以下是我在数小时的研究和堆栈溢出搜索后想出的代码。我已经建立了很多 日志记录以查看哪个线程上发生了什么。

用户控制器:

@RequestMapping(value = "/user", method = RequestMethod.POST)
public Mono<ResponseEntity<Integer>> createUser(@RequestBody ImUser user) {
return Mono.just(user)
.map(it -> {
logger.debug("Receiving request on thread: " + Thread.currentThread().getName());
return it;
})
.map(userService::create)
.map(it -> {
logger.debug("Sending response on thread: " + Thread.currentThread().getName());
return ResponseEntity.status(HttpStatus.CREATED).body(it);
})
.mapError(DuplicateKeyException.class, e -> new SomeSpecialException(e.getMessage(), e));
}

用户服务:

public int create(ImUser user) {
return Mono.just(user)
.subscribeOn(Schedulers.elastic())
.map(u -> {
logger.debug("UserService thread: " + Thread.currentThread().getName());
return imUserDao.insertUser(u);
})
.block();
}

用户道:

@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.READ_COMMITTED, rollbackFor = Exception.class)
public int insertUser(ImUser user) {
logger.debug("Insert DB on thread: " + Thread.currentThread().getName());
return dsl.insertInto(IM_USER,IM_USER.VERSION, IM_USER.FIRST_NAME, IM_USER.LAST_NAME, IM_USER.BIRTHDATE, IM_USER.GENDER)
.values(1, user.getFirstName(), user.getLastName(), user.getBirthdate(), user.getGender())
.returning(IM_USER.ID)
.fetchOne()
.getId();
}

代码按预期工作,"接收请求"和"发送响应"都在同一线程上运行(reactor-http-server-epoll-x) 而阻塞代码(对imUserDao.insertUser(u)的调用)在弹性调度程序线程(elastic-x)上运行。 事务绑定到调用注释方法的线程(即 elastic-x),因此按预期工作(我已经测试过 它使用不同的方法,未在此处发布,以保持简单)。

下面是一个日志示例

20:57:21,384 DEBUG         admin.UserController| Receiving request on thread: reactor-http-server-epoll-7
20:57:21,387 DEBUG            admin.UserService| UserService thread: elastic-2
20:57:21,391 DEBUG        admin.ExtendedUserDao| Insert DB on thread: elastic-2
20:57:21,393 DEBUG         tools.LoggerListener| Executing query          
...
20:57:21,401 DEBUG              tools.StopWatch| Finishing                : Total: 9.355ms, +3.355ms
20:57:21,409 DEBUG         admin.UserController| Sending response on thread: reactor-http-server-epoll-7

我已经研究响应式编程很长时间了,但从来没有完全编程过任何响应式编程。现在我是,我想知道我做得是否正确。 所以这是我的问题:

1.上面的代码是处理传入的HTTP请求,查询数据库然后响应的好方法吗? 请忽略我内置的 logger.debug(...) 调用,以保持理智:)我有点期望有一个Flux作为控制器方法的参数,从某种意义上说,我有多个潜在请求的流 这将在某个时候到来,并将以相同的方式处理。相反,我找到的示例会在每次收到请求时创建一个Mono.from(...)。

2. 在用户服务中创建的第二个 Mono (Mono.just(user) )感觉有些尴尬。我知道我需要开始一个新的流才能 在弹性调度程序上运行代码,但没有执行此操作的运算符吗?

3.从代码的编写方式来看,我了解到用户服务内部的Mono将被阻止,直到DB操作完成, 但是,为请求提供服务的原始流不会被阻止。这是对的吗?

4. 我计划将Schedulers.elastic()替换为并行调度程序,我可以在其中配置工作线程的数量。这个想法是,最大工作线程数应与最大数据库连接数相同。 当调度程序中的所有工作线程都繁忙时会发生什么?那是背压跳进来的时候吗?

5.我最初希望在我的控制器中包含以下代码:

return userService.create(user)
.map(it -> ResponseEntity.status(HttpStatus.CREATED).body(it))
.mapError(DuplicateKeyException.class, e -> new SomeSpecialException(e.getMessage(), e));

但我无法实现这一目标并保持事物在正确的线程中运行。有什么方法可以在我的代码中实现这一点吗?

任何帮助将不胜感激。谢谢!

服务和控制器
服务阻塞的事实是有问题的,因为在控制器中,您正在从未在单独线程上移动的map内部调用阻塞方法。这可能会阻止所有控制器。

相反,您可以做的是从UserService#create返回Mono(删除末尾的block())。由于该服务确保 Dao 方法调用是隔离的,因此问题较少。从那里,无需在控制器中执行Mono.just(user):只需调用 create 并直接在生成的 Mono 上开始链接运算符:

@RequestMapping(value = "/user", method = RequestMethod.POST)
public Mono<ResponseEntity<Integer>> createUser(@RequestBody ImUser user) {
//this log as you saw was executed in the same thread as the controller method
logger.debug("Receiving request on thread: " + Thread.currentThread().getName());
return userService.create(user)
.map(it -> {
logger.debug("Sending response on thread: " + Thread.currentThread().getName());
return ResponseEntity.status(HttpStatus.CREATED).body(it);
})
.mapError(DuplicateKeyException.class, e -> new SomeSpecialException(e.getMessage(), e));
}

日志记录
请注意,如果您想记录某些内容,有几个比执行map并返回it更好的选择:

  • doOnNext方法就是为此量身定制的:对其中一个反应信号做出反应(在本例中为onNext:发出一个值)并执行一些非变异动作,使输出序列与源序列完全相同。doOn 的"副作用"可能是写入控制台或增加统计计数器,例如......还有doOnComplete,doOnError,doOnSubscribe,doOnCancel等。

  • log只是按照上面的顺序记录所有事件。它将检测您是否使用 SLF4J,如果是,则在 DEBUG 级别使用配置的记录器。否则,它将使用 JDK 日志记录功能(因此您还需要将其配置为显示 DEBUG 级别日志)。

关于事务或任何依赖于
ThreadLocal 和线程粘性ThreadLocal在响应式编程中可能会有问题,因为底层执行模型在整个序列中保持不变的保证较少。Flux可以分几个步骤执行,每个步骤在不同的Scheduler(以及线程或线程池)。即使在特定步骤中,一个值也可以由底层线程池的线程 A 处理,而稍后到达的下一个值将在线程 B 上处理。

在这种情况下,依赖 Thread Local 就不那么简单了,我们目前正在积极努力提供更适合响应式世界的替代方案。

您创建连接池大小的池的想法很好,但不一定足够,因为事务通量可能会使用多个线程,因此可能会污染事务中的某些线程。

当池的线程
用完时会发生什么 如果您使用特定的Scheduler来隔离阻塞行为,如下所示,一旦线程用完,它将抛出RejectedExecutionException

最新更新