如何创建反应式"中间订阅者"?如何将所有助焊剂<T>部件和它们的工艺结果进一步组合到链中(转换为单声道<R>)?



我正在使用Spring webflux + mongodb-reactive将二进制文件(图像)保存到Mongo DB。不幸的是,spring-boot-starter-data-mongodb-reactive:2.0.5.RELEASE 不支持GridFsTemplate功能的响应式编程。因此,我决定创建一个订阅者,它将采用所有DataBuffer部分,将它们组合并转换为InputStream,因此GridFsTemplate::store成为可能:

public class GridFsTemplateSubscriber extends BaseSubscriber<DataBuffer> {
private GridFsTemplate gridFsTemplate;
private List<DataBuffer> dataBuffers;
private String fileName;
public GridFsTemplateSubscriber(GridFsTemplate gridFsTemplate, String fileName) {
this.gridFsTemplate = gridFsTemplate;
this.fileName = fileName;
dataBuffers = new ArrayList<>();
}
public void hookOnNext(DataBuffer dataBuffer) {
dataBuffers.add(dataBuffer);
request(1);
}
public void hookOnComplete() {
DefaultDataBufferFactory defaultDataBufferFactory = new DefaultDataBufferFactory();
InputStream inputStream = defaultDataBufferFactory.join(dataBuffers).asInputStream();
ObjectId objectId = gridFsTemplate.store(inputStream, fileName);
}
}

问题是我想返回objectId进行进一步处理hookOnComplete但这是无效的类型。愈。。我想从这里获得 ObjectId 的单声道,这样我就可以以响应式方式进一步处理它。在这种情况下,根据我对反应哲学的理解,我不应该使用"真正的"订阅者,而是将Flux<T>的结果结合起来的东西,当 onComplete 时,返回Mono<R>。项目反应器有这样的能力吗?我是响应式编程的新手,所以我可能会错过整个想法,所以请指导我如何实现这一目标。

在我之前的解决方案中,我使用block()来结束反应链,所以我得到了ObjectId,接下来我用新链发出对象 ID。但这肯定不是一个好的解决方案。

不幸的是,整个方法都是在自找麻烦,不建议这样做。

实现Subscriber(即使使用BaseSubscriber)几乎从来都不是解决方案。如果你使用的驱动程序库不支持反应式流,那么你可能应该将其包装为阻塞代码。

这样做会丢失许多功能(运行时行为、背压等),但至少如果你的订阅者实现不完美,你不会冒着炸毁整个应用程序的风险。

在你的核心库/驱动程序支持反应式流之前,你应该坚持使用 Spring MVC,它支持异步概念,在某些情况下FluxMono返回类型。

最新更新