处理分页的WebClient响应,而无需等待和阻塞Flux中的下一页



我已经开发了一个应用程序来从第三方站点获取数据并进行处理。数据将以分页的方式获取。目前,该实现工作良好,因为它从所有页面收集所有数据,然后从中创建一个列表。由于内存问题,我们希望在数据可用时立即处理数据,而不阻止下一个请求。下面是我的实现。

public Mono<List<JsonNode>> featchData() {

return webClientHelper.fetchItems(null).expand(response -> {
List<String> headerValue = response.getHeaders().get(Constants.HEADER_ITEM_CURSOR);
if (headerValue == null || headerValue.isEmpty()) {
return Mono.empty();
}
return webClientHelper.fetchItems(headerValue.get(0));
}).flatMap(response -> Flux.fromIterable(Arrays.asList(response.getBody()))).collectList();
}

下面是我的web客户端助手方法

public Mono<ResponseEntity<JsonNode[]>> fetchItems(String headerValue) {
return webFluxConfig.getWebClient().get()
.uri("/orders/")
.accept(MediaType.APPLICATION_JSON)
.httpRequest(httpRequest -> {
HttpClientRequest reactorRequest = httpRequest.getNativeRequest();
reactorRequest.responseTimeout(Duration.ofSeconds(5));
})
.header(HEADER_ITEM_CURSOR, headerValue)
.retrieve()
.onStatus(
status -> status.value() == STATUS_CODE_401,
clientResponse -> Mono.empty()
)
.onStatus(HttpStatus::is5xxServerError, response -> Mono.error(
new ServerErrorException("Server error", response.rawStatusCode())))
.toEntity(JsonNode[].class)
.timeout(Duration.ofMillis(configuration.getBManagedTimeoutMilliseconds()))
.retryWhen(Retry.backoff(3, Duration.ofSeconds(2))
.filter(ServerErrorException.class::isInstance)
.onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> {
throw new ServerErrorException("Failed to process after max retries",
      HttpStatus.SERVICE_UNAVAILABLE.value());
}));
}

我使用JsonNode[]的原因是,响应数据将是动态属性数组。

注:HEADER_ITEM_CURSOR将包含与下一组结果相关的数据。我们需要在下一个请求中传递它,如果HEADER_ITEM_CURSOR为null,那么下一个申请就没有更多的数据了。

解决方案应该是,一旦第一页数据可用,就应该对其进行处理。获取第二页数据的调用可能会/可能不会被阻止,一旦可用,就应该将其添加到列表中进行处理。

我通过将Mono更改为Flux并订阅找到了解决方案。

public Flux<ResponseEntity<JsonNode[]>> featchData() {
return webClientHelper.fetchItems(null).expand(response -> {
List<String> headerValue = response.getHeaders().get(Constants.HEADER_ITEM_CURSOR);
if (headerValue == null || headerValue.isEmpty()) {
return Mono.empty();
}
return webClientHelper.fetchItems(headerValue.get(0));
});

}

在上面的代码中,我删除了最后一行.flatMap(response -> Flux.fromIterable(Arrays.asList(response.getBody()))).collectList()

然后在调用方法中,我开始直接处理数据,而不是收集到列表中。

public static void main(String[] args) {
featchData().subscribe( json -> Arrays.stream(json.getBody()).sequential()
.forEach(item -> System.out.println(item.getId())));
}

如果有人发现上述实施存在任何问题,请告诉我。

最新更新