Vertx网络客户端响应主体的反应流发布者



我正在尝试为Vert.x web客户端编写一个包装器,以便使用来自reactivestreams:的Publisher从服务器加载响应主体

import org.reactivestreams.Publisher;
import io.vertx.reactivex.ext.web.client.WebClient;
interface Storage {
Publisher<ByteBuffer> load(String key);
}
class WebStorage implements Storage {
private final WebClient client;
public WebStorage(final WebClient client) {
this.client = client;
}
@Override
public Publisher<ByteBuffer> load(final String key) {
return client.get(String.format("https://myhost/path?query=%s", key))
.rxSend()
.toFlowable()
.map(resp -> ByteBuffer.wrap(resp.body().getBytes()));
}
}

这个解决方案是不正确的,因为它使用getBytes()调用以阻塞的方式读取所有正文字节。

是否可以通过块读取Vert.xWebClient的响应并将其转换为Publisher(或RxFlowable(?

Vert.x Web客户端不是为流式传输响应体而设计的。它通过设计缓冲内容。

如果您想流式传输内容,可以使用更灵活的底层HTTP客户端。

我想你可以使用ByteCodec.pipe:

import io.reactivex.Flowable;
import io.vertx.ext.reactivestreams.ReactiveWriteStream;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.core.streams.WriteStream;
import io.vertx.reactivex.ext.web.client.WebClient;
import io.vertx.reactivex.ext.web.codec.BodyCodec;
import org.reactivestreams.Publisher;
import java.nio.ByteBuffer;
interface Storage {
Publisher<ByteBuffer> load(String key);
}
class WebStorage implements Storage {
private final Vertx vertx = Vertx.vertx();
private final WebClient client;
public WebStorage(final WebClient client) {
this.client = client;
}
@Override
public Publisher<ByteBuffer> load(final String key) {
final ReactiveWriteStream<Buffer> stream = ReactiveWriteStream.writeStream(vertx.getDelegate());
client.get(String.format("https://myhost/path?query=%s", key))
.as(BodyCodec.pipe(WriteStream.newInstance(stream)))
.rxSend().subscribe();
return Flowable.fromPublisher(stream).map(buffer -> ByteBuffer.wrap(buffer.getBytes()));
}
}

最新更新