Webflux生产者-消费者问题(webClient)



嗨,我对WebFlux和背压有问题:

Flux.range(0, 100)
.flatMap((Integer y) -> {
return reallySlowApi(); 
})
.doOnEach((Signal<String> x1) -> {
log("next-------" );
})
.subscribeOn(Schedulers.elastic())
.subscribe()
;

如何将通话限制在每5秒一次。注意:只有reallySlowApi可以修改。

private Mono<String> reallySlowApi() {
return webClient
.get()
.retrieve()
.bodyToMono(String.class);
}

编辑:我知道delayElements,但如果Api变得更慢,它不会解决问题。我需要使用reallySlowApi的最佳方式。

一种方法是使用delayElements((

public void run() {
Flux.range(0, 100)
.delayElements(Duration.ofSeconds(5)) // only emit every 5 seconds
.flatMap(y -> reallySlowApi())
.doOnNext(x1 -> System.out.println("next-------"))
.blockLast(); // subscribe AND wait for the flux to complete
}
private Mono<String> reallySlowApi() {
return Mono.just("next");
}

您也可以使用Flux.interval((加上take((来限制迭代次数。

Flux.interval(Duration.ofSeconds(5))
.take(100)

请注意,您的示例中的subscribeOn并没有部分地执行任何操作,因为subscribe操作应用于0-100范围的生成,而0-100范围不是阻塞的。

您可以在Web客户端代码中使用重试机制

.doOnError(error -> handleError(error.getMessage()))
.timeout(Duration.ofSeconds(ServiceConstants.FIVE))
.retryWhen(
Retry.backoff(retryCount, Duration.ofSeconds(ServiceConstants.FIVE))
.filter(throwable -> throwable instanceof TimeoutException)
)

只是把我找到的解决方案放在这里。WebFlux在映射响应时,我们可以通过并发参数来解决这个问题。

flatMap(映射程序,并发(

.flatMap((Integer y) -> {
return reallySlowApi(); 
} , 3)

最新更新