Spring Webflux - R2dbc : 如何在迭代结果集时运行子查询并更新值



我是反应式存储库和webflux的新手。我正在从数据库中获取数据列表,使用map()对其进行迭代以构建 DTO 类对象,在此过程中我需要运行另一个查询来获取计数值并更新相同的 DTO 对象。当我尝试如下时,计数设置为空

@Repository
public class CandidateGroupCustomRepo {
public Flux<CandidateGroupListDTO> getList(BigInteger userId){
final String sql = "SELECT gp.CANDIDATE_GROUP_ID,gp.NAME  ,gp.GROUP_TYPE   n" +
"                             ,gp.CREATED_DATE  ,cd.DESCRIPTION STATUS ,COUNT(con.CANDIDATE_GROUP_ID)n" +
"                             FROM  ........" +
"                             WHERE gp.CREATED_BY_USER_ID = :userId  GROUP BY gp.CANDIDATE_GROUP_ID,gp.NAME  ,gp.GROUP_TYPE   n" +
"                             ,gp.CREATED_DATE  ,cd.DESCRIPTION";
return dbClient.execute(sql)
.bind("userId", userId)
.map(row ->{
CandidateGroupListDTO info = new CandidateGroupListDTO();
info.setGroupId(row.get(0, BigInteger.class));
info.setGroupName(row.get(1, String.class)) ;
info.setGroupType(row.get(2, String.class));
info.setCreatedDate( row.get(3, LocalDateTime.class));
info.setStatus(row.get(4, String.class));
if(info.getGroupType().equalsIgnoreCase("static")){
info.setContactsCount(row.get(5, BigInteger.class));
}else{
getGroupContactCount(info.getGroupId()).subscribe(count ->{
System.out.println(">>>>>"+count);
info.setContactsCount(count);

});
}
return info;
}
)
.all() ;
}

Mono<BigInteger> getGroupContactCount(BigInteger groupId){
final String sql = "SELECT 3 WHERE :groupId IS NOT NULL;";
return dbClient.execute(sql)
.bind("groupId", groupId)
.map(row -> {
System.out.println(row.get(0, BigInteger.class));
return row.get(0, BigInteger.class);
}  ).one();
}

}

当我打电话给getGroupContactCount时,我正在尝试从Mono<BigInteger>中提取计数并将其设置为我的 DTO 中......sys out 正确打印计数值,但我仍然在响应中得到计数为空。

你在中间调用subscribe,这反过来又是阻塞的。订阅者通常是最终消费者,我猜您的 spring 应用程序不是,很可能最终消费者是发起调用的网页。您的服务器是生产者。

调用数据库,flatMap并返回。

return dbClient.execute(sql)
.bind("userId", userId)
.flatMap(row ->{
CandidateGroupListDTO info = new CandidateGroupListDTO();
info.setGroupId(row.get(0, BigInteger.class));
info.setGroupName(row.get(1, String.class)) ;
info.setGroupType(row.get(2, String.class));
info.setCreatedDate( row.get(3, LocalDateTime.class));
info.setStatus(row.get(4, String.class));
if(info.getGroupType().equalsIgnoreCase("static")){
return Mono.just(info.setContactsCount(row.get(5, BigInteger.class)));
} else {
return getGroupContactCount(info.getGroupId()).flatMap(count -> {
info.setContactsCount(count);
return Mono.just(info)
});
}
}).all();

如果顺序很重要,请使用map,否则尝试使用flatMap执行异步工作。

最新更新