我试图在Quarkus中使用ReactiveMongoClient从MongoDB读取集合数据。我得到多,并试图映射到我的自定义对象和准备列表,并转换成地图<长,列表>offersMapGroubyCustomerId但运气不好。谁来帮帮我吧。我在Quarkus Reactive Programming的调度程序中尝试过这样做。
代码片段:
@ApplicationScoped
public class OfferScheduler {
@Inject
ReactiveMongoClient reactiveMongoClient;
@Scheduled(cron = "{cron.expr}")
public void publishPropositions() {
getOffersCollection()
.aggregate(pipeline())
.map(this::mapOffer)
.collect()
.with(Collectors.groupingBy(Offer::getMemberId))
.onItem().transformToUni(this::iterateOfferMap);
}
private Uni<Void> iterateOfferMap(Map<Long, List<Offer>> item) {
// I need to iterate this map here
return Uni.createFrom().nullItem();
}
private Offer mapOffer(Document document) {
return Offer.builder()
.memberId(document.getLong(MEMBER_ID))
.offerId(document.getLong(OFFER_ID))
.remainingUsage(document.getInteger(REMAINING_USAGE))
.offerName(document.getString(PROPOSITION_TYPE))
.status(document.getString(STATUS))
.build();
}
private ReactiveMongoCollection<Document> getOffersCollection() {
return reactiveMongoClient.getDatabase("database").getCollection("offer");
}
}
您可以使用Uni#call
(或Uni#chain
):
@Scheduled(cron = "{cron.expr}")
@NonBlocking // I think you might need this annotation but I haven't tested it
public void publishPropositions() {
getOffersCollection()
.aggregate(pipeline())
.map(this::mapOffer)
.collect()
.with(Collectors.groupingBy(Offer::getMemberId))
.call(this::iterateOfferMap);
}
private Uni<Void> iterateOfferMap(Map<Long, List<Offer>> item) {
// Do something with the map
return Uni.createFrom().voidItem();
}
但是publishPropositions
方法不是反应性的,您确定对于这个用例MongoClient
不是更合适(而不是ReactiveMongoClient
)吗?