我对SpringWebflux还是个新手,Mono上的flatMap似乎不起作用。我有以下函数,并调用kafkaPublisher。publishToTopic不起作用。我插入print语句来测试它是否打印了任何内容,而它甚至不执行print语句publishToTopic返回Mono<Void>
。
private Mono<Void> test(Long gId, UUID pId) {
Mono<UUID> nId = pDao.findNId(pId);
Mono<List<String>> channels = nId.flatMapMany(pDao::findChannels).collectList();
return Mono.zip(nId, channels)
.flatMap(t -> {
System.out.println(t.getT1());
return kafkaPublisher.publishToTopic(gId, t.getT1().toString(), t.getT2());
});
}
如果在flatMap
上调用.block
,则会调用它,如下所示。
private Mono<Void> test(Long gId, UUID pId) {
Mono<UUID> nId = pDao.findNId(pId);
Mono<List<String>> channels = nId.flatMapMany(pDao::findChannels).collectList();
Mono.zip(nId, channels)
.flatMap(t -> {
System.out.println(t.getT1());
return kafkaPublisher.publishToTopic(gId, t.getT1().toString(), t.getT2());
}).block();
return Mono.empty();
}
我发现了我的错误。在调用这个test
方法的函数中,我没有在任何地方使用test
的结果。这是我用来调用test
的代码
public Mono<Void> saveNew(NewPre pre) {
preDao.insert(pre)
.flatMap(p -> test(p.pId(), p.nId()));
return Mono.empty();
}
我把它改成了following,它起作用了。
public Mono<Void> saveNew(NewPre pre) {
return preDao.insert(preference)
.flatMap(p -> test(p.p(), p.n())
.then(Mono.empty()));
}
flatMap无限期挂起,通过将异步操作转换为未来对象来关闭异步操作的最佳方式。
public Mono<Void> saveNew(NewPre pre) {
return preDao.insert(preference)
.flatMap({
p -> test(p.p(), p.n())
}).toFuture();
}