使用 Spring WebClient 响应式读取行



TLDR:如何使用Spring WebClient(reactive one)逐行处理GET响应?

详:

  • 远程服务器返回大小高达 20 Gb 的响应
  • 我的服务单独解析行(每行编码为 UTF8)并流式传输结果(跳过 99% 的行)
  • 我不想将整个响应加载到内存中,例如,我想逐行解析服务器更新。

不幸的是,我没有找到任何将Flux<ByteBuffer>转换为Flux<String>的解决方案(通过在行尾拆分)。

问题:是否有任何嵌入式转换器/解码器可以做到这一点?

可能的解决方案:

  • 创建临时缓冲区(最初为空)
  • 对于每个输入缓冲区: 将临时缓冲区
    • 附加到新的重新创建的临时缓冲区。
    • 尝试从此缓冲区读取单行(例如,读取直到行尾):
    • 如果还有剩余字节 - 返回此字符串并重复读行
    • 如果缓冲区已完成(例如,那里没有行分隔符):只需将这些字节复制到临时缓冲区即可。
  • 在最后一个缓冲区之后:读取临时缓冲区直到结束。

另外:您不能只将输入缓冲区转换为字符串,因为某些 utf8 字符可以从缓冲区 N 开始,然后在缓冲区 N+1 处继续。

下面的代码可以工作,但这是完全同步的代码(可能只有预取功能)。它使用Apache Http组件。

HttpClientBuilder.create().build().use { client ->
val responseHandler = ResponseHandler { response ->
response.entity.content.use { content ->
content.bufferedReader().use { buffered ->
// create class, which can process each line. 
val processor = StreamedLinesProcessor<TResult>()
do {
val nextLine = buffered.readLine()
val needContinue = processor.processNextLine(nextLine)
} while (needContinue)
processor.getResult()
}
}
}
client.execute(HttpGet(url.toString()), responseHandler)
}

我还没有找到嵌入的,我正在使用类似于以下代码的东西。我假设您使用单字符n行结尾,并且n在此类看到的任何编码中始终编码为单字节字符,例如不支持 UTF16。

Mono<ResponseEntity<Flux<String>>>  = webclient
.post()
.uri(...)
.contentType(...)
.bodyValue(...)
.retrieve()
.toEntityFlux(new LineBodyExtractor());

import lombok.NonNull;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.client.reactive.ClientHttpResponse;
import org.springframework.util.MimeType;
import org.springframework.web.reactive.function.BodyExtractor;
import reactor.core.publisher.Flux;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Optional.of;

/**
* Only supports single char separators and charsets
**/
public class LineBodyExtractor implements BodyExtractor<Flux<String>, ClientHttpResponse> {
private static final String SEPARATOR_CHAR = "n";
@NonNull
@Override
public Flux<String> extract(ClientHttpResponse inputMessage, @NonNull Context ignored) {
var charSet = of(inputMessage.getHeaders())
.map(HttpHeaders::getContentType)
.map(MimeType::getCharset)
.orElse(UTF_8);
var separatorBytes = SEPARATOR_CHAR.getBytes(charSet);
if (separatorBytes.length != 1) {
throw new IllegalStateException("Charset %s doesn't encode separator %s to a single byte and is unsupported".formatted(charSet, SEPARATOR_CHAR));
}
byte separator = separatorBytes[0];
return inputMessage.getBody()
.flatMap(buf -> this.splitBy(separator, buf))
.windowUntil(buf -> buf.indexOf(i -> i == separator, 0) == 0)
// need to join byte buffers before calling to string, UTF8 is multi byte
.flatMap(DataBufferUtils::join)
.map(buf -> buf.toString(charSet));
}
Flux<DataBuffer> splitBy(Byte separator, DataBuffer buf) {
if (buf.capacity() <= 0) { // e.g. an empty trailing line should still emit an event
return Flux.just(buf);
}
Flux<DataBuffer> r = Flux.empty();
int separatorIndex;
while ((separatorIndex = buf.indexOf(i -> i == separator, 0)) >= 0) {
r = Flux.concat(r, Flux.just(buf.split(separatorIndex), buf.split(1)));
}
return Flux.concat(r, Flux.just(buf)).filter(b -> b.capacity() > 0);
}
}

相关内容

  • 没有找到相关文章

最新更新