Spring Webflux:如何使用不同的线程进行请求和响应



我正在使用Spring Webflux,据我了解,通过使用它,用于接收请求的线程和用于响应的线程应该不同。但是,无论我使用 netty 还是 undertow,我最终都会使用相同的线程。

我的应用程序是一个简单的带有MySQL DB的crud应用程序。我没有使用 r2dbc,而是使用 jdbc 与执行器和调度程序相结合。

如下面的日志所示,请求由线程 [ XNIO-1 I/O-6] 处理,响应由同一个线程给出。通过这种方式,我假设线程被阻止,直到数据库操作完成。我该如何解决这个问题?

这是日志

2019-07-23 17:49:10.051  INFO 132 --- [           main] org.xnio                                 : XNIO version 3.3.8.Final
2019-07-23 17:49:10.059  INFO 132 --- [           main] org.xnio.nio                             : XNIO NIO Implementation Version 3.3.8.Final
2019-07-23 17:49:10.114  INFO 132 --- [           main] o.s.b.w.e.undertow.UndertowWebServer     : Undertow started on port(s) 8080 (http)
2019-07-23 17:49:10.116  INFO 132 --- [           main] c.n.webflux.demo.WebfluxFunctionalApp    : Started WebfluxFunctionalApp in 1.262 seconds (JVM running for 2.668)
2019-07-23 17:49:10.302 DEBUG 132 --- [   XNIO-1 I/O-6] o.s.w.s.adapter.HttpWebHandlerAdapter    : [4c85975] HTTP GET "/api/findall"
2019-07-23 17:49:10.322 DEBUG 132 --- [   XNIO-1 I/O-6] s.w.r.r.m.a.RequestMappingHandlerMapping : [4c85975] Mapped to public reactor.core.publisher.Mono<java.util.List<com.webflux.demo.model.TypeStatus>> com.webflux.demo.controller.MonitoringController.findAll()
2019-07-23 17:49:10.337 DEBUG 132 --- [   XNIO-1 I/O-6] o.s.w.r.r.m.a.ResponseBodyResultHandler  : Using 'application/json;charset=UTF-8' given [*/*] and supported [application/json;charset=UTF-8, application/*+json;charset=UTF-8, text/event-stream]
2019-07-23 17:49:10.338 DEBUG 132 --- [   XNIO-1 I/O-6] o.s.w.r.r.m.a.ResponseBodyResultHandler  : [4c85975] 0..1 [java.util.List<com.webflux.demo.model.TypeStatus>]
2019-07-23 17:49:10.347  INFO 132 --- [pool-1-thread-1] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...
2019-07-23 17:49:10.785  INFO 132 --- [pool-1-thread-1] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.
2019-07-23 17:49:10.838 DEBUG 132 --- [pool-1-thread-1] org.springframework.web.HttpLogging      : [4c85975] Encoding [[com.webflux.demo.model.TypeStatus@7b4509cb, com.webflux.demo.model.TypeStatus@22676ebe, (truncated)...]
2019-07-23 17:49:10.949 DEBUG 132 --- [   XNIO-1 I/O-6] o.s.w.s.adapter.HttpWebHandlerAdapter    : [4c85975] Completed 200 OK

我的道也是

@Repository
public class TypeStatusJdbcTemplate {
    private JdbcTemplate jdbcTemplate;
    public TypeStatusJdbcTemplate(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }
    private final static String SQL_FIND_ALL = "select * from `monitoring`.`type_status` limit 3";

    public List<TypeStatus> findAll() {
        return jdbcTemplate.query(SQL_FIND_ALL,
                new TypeStatusMapper());
    }
}

服务是

@Service
public class MonitoringService {
    private final Scheduler scheduler;
    private TypeStatusJdbcTemplate repository;
    public MonitoringService(Scheduler scheduler, TypeStatusJdbcTemplate repository) {
        this.scheduler = scheduler;
        this.repository = repository;
    }
    public Mono<List<TypeStatus>> findAll() {
        return Mono.fromCallable(repository::findAll).subscribeOn(scheduler);
    }
}

控制器是

@RestController
@RequestMapping("/api")
public class MonitoringController {
    private final MonitoringService monitoringService;
    private static final Logger logger = LoggerFactory.getLogger(MonitoringController.class);
    public MonitoringController(MonitoringService monitoringService) {
        this.monitoringService = monitoringService;
    }
    @GetMapping(value="/findall")
    public Mono<List<TypeStatus>> findAll() {
        return monitoringService.findAll();
    }
}

主文件(显示调度程序(

@SpringBootApplication
public class WebfluxFunctionalApp {
    public static void main(String[] args){
        SpringApplication.run(WebfluxFunctionalApp.class, args);
    }

    @PostConstruct
    public void init(){
        // Setting Spring Boot SetTimeZone
        TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
    }

    @Bean
    public Scheduler jdbcScheduler() {
        return Schedulers.fromExecutor(Executors.newFixedThreadPool(30));
    }
}

线程执行并不总是不同的线程。摘自响应式文档:

反应式调度程序

获得助焊剂或单声道并不一定意味着它将在专用线程中运行。相反,大多数运算符继续在执行前一个运算符的线程中工作。除非指定,否则最顶层的运算符(源(本身在进行 subscribe(( 调用的线程上运行。

所以没有什么说它必须是一条新线程。

最新更新