在我的Spring Webflux API网关中,我通过REST从微服务接收Flux:
public Flux<MyObject> getMyObjects(String id) {
Flux<MyObject> myObjects = webClient.get().uri(nextServerUrl + "/myobject" + issueId)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToFlux(MyObject.class);
return myObjects;
}
我必须在 API 网关中重新排列微服务接收的信息,以便响应客户端。我尝试以两种方式做到这一点:
- 尽可能使用助焊剂:
private Rearranged createClientResponse(String id) {
Rearranged rearranged = new Rearranged();
Flux<MyObject> myObjects = myObjectService.getMyObjects(id);
rearranged.setMyObjects(myObjects);
myObjects.map(myObject -> {
rearranged.setInfo(myObject.getInfo());
//set more
return myObjects;
});
return rearranged;
}
public class Rearranged {
private Flux<MyObject> myObjects;
//more attributes
}
结果:以下空对象:
{
"information": null,
"myObjects": {
"scanAvailable": true,
"prefetch": -1
}
}
- 阻止通量并使用同步对象
private Rearranged createClientResponse(String id) {
Rearranged rearranged = new Rearranged();
List<MyObject> myObjects = myObjectService.getMyObjects(id).collectList().block();
rearranged.setMyObjects(myObjects);
rearranged.setInfo(myObjects.get(0).getInfo());
return rearranged;
}
public class Rearranged {
private List<MyObject> myObjects;
//more attributes
}
结果:收到异常块((/块第一((/块最后((是线程不支持的阻塞
实现重新排列微服务响应中的信息以响应客户端的可能性的正确方法是什么? 如何阻止通量完成?我知道当我返回"同步"对象时,一个块是可能的(就像我正在做但仍然得到异常(?
首先,你的模型不应该计算反应式流。使用普通对象或列表。
public class Rearranged {
private MyObject myObject;
}
或
public class Rearranged {
private List<MyObject> myObjects;
}
如果阻塞线程,反应堆线程将在片刻内耗尽。如果你的getMyObjects
方法只接收一个对象(如果没有,请查看注释的末尾(,那么你应该把它作为一个Mono
来处理。 然后在createClientResponse
,你必须带着Mono<Rearranged>
返回 现在,您可以使用.map
方法轻松地从一个单声道映射到另一个单声道。
private Mono<Rearranged> createClientResponse(String id) {
Mono<MyObject> myObjects = myObjectService.getMyObjects(id);
return myObjects.map(myObject -> {
retrun new Rearranged(myObject)
//create the proper object here
});
}
如果需要更多对象,可以使用相同的方法,例如collectList()
将元素从Flux<>
收集到Mono<List<>>
中,那么也可以接受相同的方法。