反应式编程:SpringWebFlux:如何构建微服务调用链



Spring Boot应用程序:

@RestController接收以下有效载荷:

{
"cartoon": "The Little Mermaid",
"characterNames": ["Ariel", "Prince Eric", "Sebastian", "Flounder"]
}

我需要以以下方式处理它:

  1. 获取每个角色名称的唯一Id:对"卡通人物"微服务进行HTTP调用,该服务按名称返回Id
  2. 转换控制器接收到的数据:将角色名称替换为上一步从"卡通人物"微服务接收的适当id。{ "cartoon": "The Little Mermaid", "characterIds": [1, 2, 3, 4] }

  3. 用转换后的数据向"卡通数据库"微服务发送HTTP POST请求。

  4. 将响应从"卡通数据库"映射到作为控制器返回值的内部表示

我遇到的问题:

我需要使用Reactive Programming(非阻塞\异步处理(与Spring WebFlux(Mono|Flux(和Spring Reactive WebClient的范例来实现所有这些步骤,但我对该堆栈没有任何经验,我试图尽可能多地阅读它,加上在谷歌上搜索了很多,但仍然有一堆未回答的问题,例如:

Q1。我已经配置了反应式webClient,它向"卡通人物"微服务发送请求:

public Mono<Integer> getCartoonCharacterIdbyName(String characterName) {
return WebClient.builder().baseUrl("http://cartoon-characters").build()
.get()
.uri("/character/{characterName}", characterName)
.retrieve()
.bodyToMono(Integer.class);
}

正如你所看到的,我有一个卡通人物的名字列表,对于每个名字我都需要调用getCartoonCharacterIdbyName(String name)方法,我不确定串行调用它的正确选项,相信正确的选项:并行执行。

编写以下方法:

public List<Integer> getCartoonCharacterIds(List<String> names) {
Flux<Integer> flux = Flux.fromStream(names.stream())
.flatMap(this::getCartoonCharacterIdbyName);
return StreamSupport.stream(flux.toIterable().spliterator(), false)
.collect(Collectors.toList());

}

但我有疑问,这段代码执行并行WebClient,而且代码调用了阻塞线程的flux.toIterable(),所以在这个实现中,我失去了非阻塞机制。

我的假设正确吗?

我需要如何将它重写为具有并行性和非阻塞性?

Q2。当我们使用Flux<Integer>characterId操作,但不使用characterId的List<Integer>时,在技术上是否可以以反应式转换控制器接收的输入数据(我的意思是用id替换名称(?

Q3。是否有可能不仅获得转换后的Data对象,还获得Mono<>在步骤2之后,另一个WebClient是否可以在步骤3中使用?

事实上,这是一个很好的问题,因为当涉及到链接微服务时,理解WebFlux或项目反应器框架需要几个步骤。

第一个是认识到WebClient应该接收发布者并返回发布者。将此推断为4种不同的方法签名,以帮助思考。

  • 单声道->单声道
  • 通量->通量
  • 单声道->通量
  • 通量->单声道

当然,在任何情况下,它都只是Publisher->Publisher,但在您更好地理解事情之前,请不要这样做。前两个是显而易见的,您只需要使用.map(...)来处理流中的对象,但您需要学习如何处理后两个。如上所述,从Flux->Mono可以用.collectList().reduce(...)完成。从单声道->通量似乎通常是用.flatMapMany.flatMapIterable或其一些变体来完成的。可能还有其他技术。您永远不应该在任何WebFlux代码中使用.block(),如果您尝试这样做,通常会出现运行时错误

在你的例子中,你想去

  • (单声道->通量(->(通量->通量(->[通量->通量]

正如你所说,你想要

  • 单声道->通量->通量

第二部分是了解链接流。你可以做

  • p3(p2(p1(对象((

这将链接p1->p2->p3,但我总是发现制作"服务层"更容易理解。

  • o2=p1(对象(
  • o3=p2(o2(
  • 结果=p3(o3(

这段代码更容易阅读和维护,并且随着一些成熟度的提高,您会理解该语句的价值。

我对你的例子唯一的问题是用WebClient作为@RequestBody来做Flux<String>。不起作用。请参阅WebClient bodyToFlux(String.class(了解字符串列表中并没有单独的值。除此之外,它是一个非常简单的应用程序。当您调试它时,您会发现它在到达Flux<Integer> ids = mapNamesToIds(fn)行之前到达.subscribe(System.out::println)行。这是因为流程在执行之前就已经设置好了。需要一段时间才能理解这一点,但这是项目反应器框架的要点。

@SpringBootApplication
@RestController
@RequestMapping("/demo")
public class DemoApplication implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
Map<Integer, CartoonCharacter> characters;
@Override
public void run(ApplicationArguments args) throws Exception {
String[] names = new String[] {"Ariel", "Prince Eric", "Sebastian", "Flounder"};
characters = Arrays.asList( new CartoonCharacter[] {
new CartoonCharacter(names[0].hashCode(), names[0], "Mermaid"), 
new CartoonCharacter(names[1].hashCode(), names[1], "Human"), 
new CartoonCharacter(names[2].hashCode(), names[2], "Crustacean"), 
new CartoonCharacter(names[3].hashCode(), names[3], "Fish")} 
)
.stream().collect(Collectors.toMap(CartoonCharacter::getId, Function.identity()));
// TODO Auto-generated method stub
CartoonRequest cr = CartoonRequest.builder()
.cartoon("The Little Mermaid")
.characterNames(Arrays.asList(names))
.build();
thisLocalClient
.post()
.uri("cartoonDetails")
.body(Mono.just(cr), CartoonRequest.class)
.retrieve()
.bodyToFlux(CartoonCharacter.class)
.subscribe(System.out::println);
}
@Bean
WebClient localClient() {
return WebClient.create("http://localhost:8080/demo/");
}
@Autowired
WebClient thisLocalClient;
@PostMapping("cartoonDetails")
Flux<CartoonCharacter> getDetails(@RequestBody Mono<CartoonRequest> cartoonRequest) {
Flux<StringWrapper> fn = cartoonRequest.flatMapIterable(cr->cr.getCharacterNames().stream().map(StringWrapper::new).collect(Collectors.toList()));
Flux<Integer> ids = mapNamesToIds(fn);
Flux<CartoonCharacter> details = mapIdsToDetails(ids);
return details;
}
//  Service Layer Methods
private Flux<Integer> mapNamesToIds(Flux<StringWrapper> names) {
return thisLocalClient
.post()
.uri("findIds")
.body(names, StringWrapper.class)
.retrieve()
.bodyToFlux(Integer.class);
}
private Flux<CartoonCharacter> mapIdsToDetails(Flux<Integer> ids) {
return thisLocalClient
.post()
.uri("findDetails")
.body(ids, Integer.class)
.retrieve()
.bodyToFlux(CartoonCharacter.class);
}
// Services
@PostMapping("findIds")
Flux<Integer> getIds(@RequestBody Flux<StringWrapper> names) {
return names.map(name->name.getString().hashCode());
}
@PostMapping("findDetails")
Flux<CartoonCharacter> getDetails(@RequestBody Flux<Integer> ids) {
return ids.map(characters::get);
}
}

还有:

@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class StringWrapper {
private String string;
}
@Data
@Builder
public class CartoonRequest {
private String cartoon;
private List<String> characterNames;
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CartoonCharacter {
Integer id;
String name;
String species;
}

最新更新