我有一个简单的用例。这是为了进行rest调用,查询mongo,然后将任意大的数据流返回给客户端,所有这些都使用反应流类型的背压管理。
使用SpringWebFlux和Reactor可以很容易地实现这一点。我现在正试图使用vert.x来实现同样的目标,作为实现简易性的比较。
在发现vert.x mongo客户端缺乏任何管理背压的支持后,我现在正尝试使用WebFlux mongo客户机,然后通过vert.x HttpResponse将数据泵送回来,如以下代码所示:
public class MyMongoVerticle extends AbstractVerticle {
ReactiveMongoOperations operations;
public void start() throws Exception {
final Router router = Router.router(vertx);
router.route().handler(BodyHandler.create());
router.get("/myUrl").handler(ctx -> {
// WebFlux mongo operations returns a ReactiveStreams compatible entity
Flux<Document> mongoStream = operations.findAll(Document.class, "myCollection");
ReactiveReadStream rrs = ReactiveReadStream.readStream();
// rrs is ReactiveStream streams subscriber
mongoStream.subscribe(rrs);
// Pump pumps the rrs (ReactiveReadStream) to the HttpServerResponse (ReactiveWriteStream)
Pump pump = Pump.pump(rrs, ctx.response());
pump.start();
});
vertx.createHttpServer().requestHandler(router::accept).listen(8777);
}
}
我遇到的问题是HttpServerResponse实现了ReactiveWriteStream<缓冲区>因此期望的是缓冲区而不是文档流。结果是一个ClassCaseException。
我的问题是如何将此文档流转换为ReactiveWriteStream<缓冲区>?也许还有另一种更好的方法可以做到这一点,所以我对如何实现这一点的其他建议持开放态度。
Pump
不适用于您,因为它目前不支持转换。你将不得不自己实施泵。幸运的是,这应该不会太难:
Flux<Document> mongoStream = operations.findAll(Document.class, "myCollection");
ReactiveReadStream<Document> rrs = ReactiveReadStream.readStream();
mongoStream.subscribe(rrs);
HttpServerResponse outStream = ctx.response();
// Changes start here
rrs.handler(d -> {
if (outStream.writeQueueFull()) {
outStream.drainHandler((s) -> {
rrs.resume();
});
rrs.pause();
}
else {
outStream.write(d.toJson());
}
}).endHandler(h -> {
outStream.end();
});
请注意,我并不认为这会比"原生"WebFlux实现更有效。
此外,本例中的JSON将被破坏,因为我没有将其包装在正确的JSON数组中