返回 Mono 会导致<ServerResponse>(邪恶的)同步,阻止客户端/服务器通信吗?



我是Spring Actor和WebFlux的新手,对Spring functional web中的事件流有点困惑。 示例:我有一个返回Mono<ServerResponse>的处理程序函数。在其中,执行一个findAll()存储库方法,返回一个Flux<T>。根据反应式宣言,为了异步、不阻塞和允许背压,我希望看到从存储库返回的每个元素的onNext()。但是,在请求处理期间查看服务器日志,我只看到一个onNext()事件,这是有意义的,因为我的返回类型是包含响应的Mono

路由器功能

@Bean
public RouterFunction<ServerResponse> itemsRoute(ItemsHandler itemsHandler) {
return RouterFunctions
.route(GET(ITEMS_ENDPOINT_V2).and(accept(MediaType.APPLICATION_JSON))
, itemsHandler::getAll);
}

处理程序函数

Mono<ServerResponse> getAll(ServerRequest request) {
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(itemRepository.findAll(), Item.class)
.log("GET items");
}

事件日志

2020-05-10 15:10:51.744  INFO 19096 --- [ctor-http-nio-4] GET items                                : | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
2020-05-10 15:10:51.744  INFO 19096 --- [ctor-http-nio-4] GET items                                : | request(unbounded)
2020-05-10 15:10:51.744  INFO 19096 --- [ctor-http-nio-4] GET items                                : | onNext(org.springframework.web.reactive.function.server.DefaultEntityResponseBuilder$DefaultEntityResponse@83426cc)
2020-05-10 15:10:51.745  INFO 19096 --- [ctor-http-nio-4] GET items                                : | onComplete()

相比之下,实现一个经典的 Spring 注释控制器方法,Flux<T>作为返回类型,我将看到每个T实例(即结果集的每个项目)的onNext(),这对我来说看起来更"正确"(客户端现在可以控制事件流等):

控制器

@GetMapping(ITEMS_ENDPOINT_V1)
public Flux<Item> getAll() {
return itemRepository
.findAll()
.log("GET items");
}

日志

2020-05-10 15:14:04.135  INFO 19096 --- [ctor-http-nio-5] GET items                                : onSubscribe(FluxOnErrorResume.ResumeSubscriber)
2020-05-10 15:14:04.136  INFO 19096 --- [ctor-http-nio-5] GET items                                : request(unbounded)
2020-05-10 15:14:04.137  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onNext(Item(id=5eb7f9461a10790e4902ac1a, description=An item, price=4999.0))
2020-05-10 15:14:04.138  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onNext(Item(id=5eb7f9461a10790e4902ac1c, description=Another item, price=7249.99))
2020-05-10 15:14:04.138  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onNext(Item(id=5eb7f9461a10790e4902ac1b, description=Yet another item, price=2399.99))
2020-05-10 15:14:04.138  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onNext(Item(id=5eb7f9461a10790e4902ac1d, description=And another item, price=699.99))
2020-05-10 15:14:04.138  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onNext(Item(id=5eb7f9461a10790e4902ac1e, description=Aaaaaand another one, price=1.99))
2020-05-10 15:14:04.138  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onComplete()

这是令人困惑的。让我详细说明一下:

  • 使用Mono<ServerResponse>似乎很邪恶,因为它将整个结果集封装在一个事件中,对我来说,这感觉就像打破异步、非阻塞、支持背压的事件流的响应性原则。这不会剥夺客户端的控制权吗?对我来说,这看起来像传统的、阻塞的客户端/服务器通信。
  • 直接返回Flux<T>感觉要好得多,因为它支持按结果事件处理和背压控制。

我的问题是:

  • 创建Mono<ServerResponse>的含义是什么?这是否会导致阻塞、同步交互,仅在从存储库读取所有项时才发出onNext()?我会失去背压功能等吗?
  • 如何让函数样式后端为结果集中的每个项目发送onNext()
  • 完全响应式(即非阻塞、异步和背压兼容)的函数样式处理程序函数的返回类型而言,最佳实践是什么?我不确定Mono<ServerResponse>是否没有打破这些反应性原则。

我可能完全错了或错过了一些重要的东西。感谢您的帮助!

这完全取决于使用ServerResponse的客户端。根据 WebFlux 文档 (https://docs.spring.io/spring-framework/docs/5.2.x/spring-framework-reference/web-reactive.html#spring-webflux),设置处理程序函数以返回Mono<ServerResponse>无论返回的项目数量如何都是标准方法,绝对没问题- 只要客户端正确处理底层Flux<T>一切都很好。出现我的问题是因为我使用curl测试了端点,它无法检测到底层Flux。使用启用函数式的客户端(如org.springframework.web.reactive.function.client.WebClient),可以将Mono<ServerResponse>首先反序列化为Flux<T>,启用所有不错的响应式功能,并使我们的onNext()事件显示出来。

客户端代码

像这样调用后端,将服务器响应反序列化为 Flux:

@GetMapping(CLIENT_ITEMS_RESOURCE_ENDPOINT_URL)
public Flux<Item> getAllItems(@RequestParam(defaultValue = "true") boolean useRetrieve) {
return  webClient.get().uri(SERVER_ITEMS_RESOURCE_V2_ENDPOINT_URL)
.retrieve()
.bodyToFlux(Item.class) // <-- de-serialize the ServerResponse into a Flux
.log("GET all items from server");
}

将导致查看所有onNext()事件,从而启用客户端事件处理:

2020-05-10 16:10:10.504  INFO 10000 --- [ctor-http-nio-2] GET all items from server   : onSubscribe(MonoFlatMapMany.FlatMapManyMain)
2020-05-10 16:10:10.504  INFO 10000 --- [ctor-http-nio-2] GET all items from server   : request(unbounded)
2020-05-10 16:10:10.511  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onNext(Item(id=5eb7f9461a10790e4902ac1a, description=bla bla, price=4999.0))
2020-05-10 16:10:10.512  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onNext(Item(id=5eb7f9461a10790e4902ac1c, description=bla bla bla, price=7249.99))
2020-05-10 16:10:10.512  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onNext(Item(id=5eb7f9461a10790e4902ac1b, description=bla bla bla bla, price=2399.99))
2020-05-10 16:10:10.512  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onNext(Item(id=5eb7f9461a10790e4902ac1d, description=bla bla bla bla bla, price=699.99))
2020-05-10 16:10:10.512  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onNext(Item(id=5eb7f9461a10790e4902ac1e, description=another item, price=1.99))
2020-05-10 16:10:10.513  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onComplete()

因此,只要客户端正确处理响应,一切都很好,完全反应。

最新更新