使用Reactive Mongo和Web客户端的非阻塞功能方法



我有一个微服务,它使用ReactiveMongoRepository接口从数据库中读取对象。

目标是获取这些对象中的每一个,并将其推送到AWS Lambda函数(在将其转换为DTO之后)。如果lambda函数的结果在200范围内,则将该对象标记为成功,否则忽略。

在过去使用简单的Mongo存储库和RestTemplate的日子里,这将是一项微不足道的任务。然而,我正在努力理解这种反应式交易,并避免阻止。

这是我想出的代码,我知道我正在阻止webClient,但我该如何避免这种情况?

@Override
public Flux<Video> index() {
return videoRepository.findAllByIndexedIsFalse().flatMap(video -> {
final SearchDTO searchDTO = SearchDTO.builder()
.name(video.getName())
.canonicalPath(video.getCanonicalPath())
.objectID(video.getObjectID())
.userId(video.getUserId())
.build();
// Blocking call
final HttpStatus httpStatus = webClient.post()
.uri(URI.create(LAMBDA_ENDPOINT))
.body(BodyInserters.fromObject(searchDTO)).exchange()
.block()
.statusCode();
if (httpStatus.is2xxSuccessful()) {
video.setIndexed(true);
}
return videoRepository.save(video);
});
}

我是从一个计划任务中调用上面的,我并不真正关心index()方法的实际结果,只关心过程中发生的事情。

@Scheduled(fixedDelay = 60000)
public void indexTask() {
indexService
.index()
.log()
.subscribe();
}

我读过很多关于这个主题的博客文章等,但它们都只是简单的CRUD操作,中间没有发生任何事情,所以不要真正向我全面介绍如何实现这些事情。

有什么帮助吗?

您的解决方案实际上非常接近。在这些情况下,您应该尝试分步骤分解反应链,并毫不犹豫地将部分转化为独立的方法以保持清晰。

@Override
public Flux<Video> index() {
Flux<Video> unindexedVideos = videoRepository.findAllByIndexedIsFalse();
return unindexedVideos.flatMap(video -> {
final SearchDTO searchDTO = SearchDTO.builder()
.name(video.getName())
.canonicalPath(video.getCanonicalPath())
.objectID(video.getObjectID())
.userId(video.getUserId())
.build();
Mono<ClientResponse> indexedResponse = webClient.post()
.uri(URI.create(LAMBDA_ENDPOINT))
.body(BodyInserters.fromObject(searchDTO)).exchange()
.filter(res -> res.statusCode().is2xxSuccessful());
return indexedResponse.flatMap(response -> {
video.setIndexed(true);
return videoRepository.save(video);
});
});

我的方法,也许可读性更强一点。但我承认我没有运行它,所以不能100%保证它会运行。

public Flux<Video> index() {
return videoRepository.findAll()
.flatMap(this::callLambda)
.flatMap(videoRepository::save);
}
private Mono<Video> callLambda(final Video video) {
SearchDTO searchDTO = new SearchDTO(video);
return webClient.post()
.uri(URI.create(LAMBDA_ENDPOINT))
.body(BodyInserters.fromObject(searchDTO))
.exchange()
.map(ClientResponse::statusCode)
.filter(HttpStatus::is2xxSuccessful)
.map(t -> {
video.setIndexed(true);
return video;
});
}

最新更新