我正在使用Spring webflux + mongodb-reactive将二进制文件(图像)保存到Mongo DB。不幸的是,spring-boot-starter-data-mongodb-reactive:2.0.5.RELEASE 不支持GridFsTemplate
功能的响应式编程。因此,我决定创建一个订阅者,它将采用所有DataBuffer部分,将它们组合并转换为InputStream
,因此GridFsTemplate::store
成为可能:
public class GridFsTemplateSubscriber extends BaseSubscriber<DataBuffer> {
private GridFsTemplate gridFsTemplate;
private List<DataBuffer> dataBuffers;
private String fileName;
public GridFsTemplateSubscriber(GridFsTemplate gridFsTemplate, String fileName) {
this.gridFsTemplate = gridFsTemplate;
this.fileName = fileName;
dataBuffers = new ArrayList<>();
}
public void hookOnNext(DataBuffer dataBuffer) {
dataBuffers.add(dataBuffer);
request(1);
}
public void hookOnComplete() {
DefaultDataBufferFactory defaultDataBufferFactory = new DefaultDataBufferFactory();
InputStream inputStream = defaultDataBufferFactory.join(dataBuffers).asInputStream();
ObjectId objectId = gridFsTemplate.store(inputStream, fileName);
}
}
问题是我想返回objectId
进行进一步处理hookOnComplete
但这是无效的类型。愈。。我想从这里获得 ObjectId 的单声道,这样我就可以以响应式方式进一步处理它。在这种情况下,根据我对反应哲学的理解,我不应该使用"真正的"订阅者,而是将Flux<T>
的结果结合起来的东西,当 onComplete 时,返回Mono<R>
。项目反应器有这样的能力吗?我是响应式编程的新手,所以我可能会错过整个想法,所以请指导我如何实现这一目标。
在我之前的解决方案中,我使用block()
来结束反应链,所以我得到了ObjectId
,接下来我用新链发出对象 ID。但这肯定不是一个好的解决方案。
不幸的是,整个方法都是在自找麻烦,不建议这样做。
实现Subscriber
(即使使用BaseSubscriber
)几乎从来都不是解决方案。如果你使用的驱动程序库不支持反应式流,那么你可能应该将其包装为阻塞代码。
这样做会丢失许多功能(运行时行为、背压等),但至少如果你的订阅者实现不完美,你不会冒着炸毁整个应用程序的风险。
在你的核心库/驱动程序支持反应式流之前,你应该坚持使用 Spring MVC,它支持异步概念,在某些情况下Flux
Mono
返回类型。