我可以将Flux作为ServerResponse主体的字段吗



我是Spring Reactive Web的新手,遇到了以下问题。我想创建一个带有端点的微服务a,该端点接受数字N,向微服务B发送N个请求(每个请求返回一个字符串(,将字符串包装成对象,将它们组合成List/Flux(?(,并返回带有这些对象的JSON,如:

{
"number": 4,
"objects": [
{
"name": "first"
},
{
"name": "second"
},
{
"name": "third"
},
{
"name": "fourth"
}
]
}

我想为此使用一个函数端点。所以我尝试了以下方法(尽我所能简化它(:

public class MyObject {
private String name; // here should be a value received from B
// ...
}
public class MyResponse {
private int number;
private Flux<MyObject> objects; // or List?
// ...
}
@Component
@RequiredArgsConstructor
public class MyHandler {
private final MyClient client;
public Mono<ServerResponse> generate(ServerRequest serverRequest) {
return serverRequest.bodyToMono(MyRequestBody.class)
.flatMap(request -> buildServerResponse(HttpStatus.OK, buildResponseBody(request)));
}
private Mono<ServerResponse> buildServerResponse(HttpStatus status, Mono<MyResponse> responseBody) {
return ServerResponse.status(status)
.contentType(MediaType.APPLICATION_JSON)
.body(responseBody, MyResponse.class);
}
private Mono<MyResponse> buildResponseBody(MyRequestBody request) {
return Mono.just(MyResponse.builder()
.number(request.getNumber())
.objects(getObjects(request.getNumber())
.build());
}
private Flux<MyObject> getObjects(int n) {
// how to receive n strings from MyClient, make MyObject from each of them and then combine them together to a Flux/List?
}
public class MyClient {
public Mono<String> getName() {
WebClient client = WebClient.builder().baseUrl(getUrl()).build();
return client.get()
// ...
.retrieve()
.bodyToMono(String.class);
}
private String getUrl() {
// ...
}
}

所以,如果我在MyResponse中使用Flux,我会收到这样的响应:

{
"number": 4,
"objects": {
"prefetch": 2147483647,
"scanAvailable": true
}
}

另一方面,如果我尝试使用List,它似乎需要在某个时候进行阻塞,并且我会收到与之相关的错误。那么,我该怎么做呢?

提前感谢!


UPDATE:如果我使用collectList().block()从通量中生成列表,我会收到这个:

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread <...>

正如我从这个问题的答案中所理解的,当我的方法返回Mono/Flux时,我永远不应该阻塞。将block()调用从返回Mono/Flux的方法中分离出来并没有帮助。如果我在block()之前使用share(),那么我的请求将永远执行,原因我还不明白。

好吧,我成功了。

Flux作为一个字段不能以期望的方式工作,所以我需要一个List

public class MyResponse {
private int number;
private List<MyObject> objects;
// ...
}

现在我需要一种从多个Mono<String>中生成List<MyObject>的方法,其中每个MyObject都有一个String字段。

问题是,我们从来没有摆脱过MonoFlux,所以我们先选择Flux<MyObject>

private Flux<MyObject> getObjects(int n) {
return Flux.range(0, n) // Flux<Integer>
.map(i -> myClient.getName()) // Flux<String>
.map(name -> new MyObject(name)); // Flux<MyObject>
}

然后我们制作Flux:

private Mono<MyResponse> buildResponseBody(MyRequestBody request) {
return getObjects(request.getNumber()) // Flux<MyObject>
.collectList() // Mono<List<MyObject>>
.map(objects -> MyResponse.builder() // Mono<MyResponse>
.number(request.getNumber())
.objects(objects)
.build()));
}

这样就可以了,因为我们不必阻止任何事情。

只有当我们想在某个时刻去掉Mono/Flux时,问题才会出现,比如我们想要一个纯的List<MyObject>。但只要我们有一个Mono和/或Flux作为输入和输出,我们就可以用这些类的方法进行所有操作,在每个阶段保留MonoFlux

相关内容

最新更新