我有一个异步操作管道,需要很长时间才能完成,我想 如果管道在截止时间之前未生成项,则让 Uni 继续使用占位符项,如下所示:
Uni<Item> u = processingPipeline();
return u.ifNoItem().after(Duration.of(10, SECONDS)).recoverWith(placeholderItem);
但是,当使用此方法时,处理管道将被取消,操作链将被中断。 是否可以在截止日期到期时使用占位符项完成 Uni,而无需取消处理管道?我希望即使错过了最后期限,管道也能坚持到底。 如果有帮助,Vert.x 可用。
谢谢
编辑:
这是我到目前为止尝试过的:
Uni<Item> u = processingPipeline();
return Uni.createFrom().emitter(emitter -> {
vertx.executeBlocking(handler -> {
u.subscribe().with(emitter::complete);
}, resultHandler -> {});
vertx.setTimer(TimeUnit.SECONDS.toMillis(10), timerId -> {
emitter.complete(placeholderItem);
});
});
当处理在截止日期之前完成时,这工作正常,但如果截止日期到期并且计时器被触发,则在发出占位符项时会崩溃并javax.enterprise.context.ContextNotActiveException
,并且应用程序似乎陷入某种死锁。
编辑 2
事实证明,我的大部分问题实际上是由于Hibernate的不当使用引起的。在对Hibernate事务的管理方式进行了一些重构之后,所有随机死锁和其他问题都消失了。
这似乎是对原始问题最优雅的解决方案,即如何在不中断处理的情况下使用虚拟项目进行响应,除非它在截止日期之前完成:
Uni<Item> u = processingPipeline();
return Uni.createFrom().emitter(emitter -> {
u.subscribe().with(emitter::complete);
vertx.setTimer(TimeUnit.SECONDS.toMillis(10), timerId -> {
emitter.complete(placeholderItem);
});
});
管道的取消是意料之中的,因为ifNoItem().after(duration)
触发TimeoutException
。因此,此异常作为失败传播,并且此特定点的上游被取消,这符合反应流语义。
recoverWith
是一个故障恢复运算符,以及订阅此运算符后
的任何内容。你可能想要查看recoverWithUni
,您将在其中提供Uni
作为恢复,并且Uni
将捕获在此特定超时点失败后可能重新订阅的管道的其余部分。
希望有帮助。