Spring webclient exchangeToFlux()不发送HTTP请求



我正在使用Spring WebFlux 5.3.6的WebClient从REST端点流响应生成text/csv内容。

我可以使用retrieve()responseSpec.bodyToFlux只对体进行流处理像这样:

WebClient.ResponseSpec responseSpec = headersSpec.retrieve();
Flux<DataBuffer> dataBufferFlux = responseSpec.bodyToFlux(DataBuffer.class);
DataBufferUtils
.write(dataBufferFlux, outputStream)
.blockLast(Duration.of(20, ChronoUnit.SECONDS));

but我想要获取content-type header并将其作为测试的一部分进行验证。上面的代码只提供对响应体的访问,而不提供对报头的访问。

我试图使用exchangeToFlux()来获得更多的控制,并访问响应头,但我所看到的是,HTTP请求从来没有做过。如果我添加一个断点到myResponse.setStatus(clientResponse.rawStatusCode());,它永远不会被击中。

下面是一个更完整的代码示例。我一直在努力寻找exchangeToFlux使用databbuffer将结果流回的任何示例。
HttpClient httpClient = HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.responseTimeout(Duration.ofMillis(5000))
.doOnConnected(conn ->
conn.addHandlerLast(new ReadTimeoutHandler(5000, TimeUnit.MILLISECONDS))
.addHandlerLast(new WriteTimeoutHandler(5000, TimeUnit.MILLISECONDS)));
WebClient webClient = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build();
WebClient.RequestHeadersSpec<?> headersSpec = webClient
.get()
.uri("http://localhost:8080/v1/users")
.header(CONTENT_TYPE, "text/csv");

MyResponse<T> myResponse = new MyResponse<>();
CountDownLatch latch = new CountDownLatch(1);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
headersSpec.exchangeToFlux(clientResponse -> {
// Never enters here!
myResponse.setStatus(clientResponse.rawStatusCode());
myResponse.setContentType(clientResponse.headers().contentType());
latch.countDown();
if (clientResponse.statusCode() == HttpStatus.OK) {
Flux<DataBuffer> dataBufferFlux = clientResponse.bodyToFlux(DataBuffer.class);
DataBufferUtils
.write(dataBufferFlux, outputStream)
.blockLast(Duration.of(20, ChronoUnit.SECONDS));
return dataBufferFlux;
}
return Flux.empty();
});
latch.await();
return myResponse;

似乎你没有订阅从headersSpec.exchangeToFlux返回的通量。

headersSpec.exchangeToFlux(clientResponse -> {
// Never enters here!
myResponse.setStatus(clientResponse.rawStatusCode());
myResponse.setContentType(clientResponse.headers().contentType());
latch.countDown();
if (clientResponse.statusCode() == HttpStatus.OK) {
Flux<DataBuffer> dataBufferFlux = clientResponse.bodyToFlux(DataBuffer.class);
DataBufferUtils
.write(dataBufferFlux, outputStream)
.blockLast(Duration.of(20, ChronoUnit.SECONDS));
return dataBufferFlux;
}
return Flux.empty();
})
.subscribe(); // <- Subscribe is missing.

相关内容

  • 没有找到相关文章

最新更新