使用占位符项完成 Uni,如果在截止时间之前未生成任何项,则无需取消处理管道



我有一个异步操作管道,需要很长时间才能完成,我想 如果管道在截止时间之前未生成项,则让 Uni 继续使用占位符项,如下所示:

Uni<Item> u = processingPipeline();
return u.ifNoItem().after(Duration.of(10, SECONDS)).recoverWith(placeholderItem);

但是,当使用此方法时,处理管道将被取消,操作链将被中断。 是否可以在截止日期到期时使用占位符项完成 Uni,而无需取消处理管道?我希望即使错过了最后期限,管道也能坚持到底。 如果有帮助,Vert.x 可用。

谢谢

编辑:

这是我到目前为止尝试过的:

Uni<Item> u = processingPipeline();
return Uni.createFrom().emitter(emitter -> {
vertx.executeBlocking(handler -> {
u.subscribe().with(emitter::complete);
}, resultHandler -> {});
vertx.setTimer(TimeUnit.SECONDS.toMillis(10), timerId -> {
emitter.complete(placeholderItem);
});
});

当处理在截止日期之前完成时,这工作正常,但如果截止日期到期并且计时器被触发,则在发出占位符项时会崩溃并javax.enterprise.context.ContextNotActiveException,并且应用程序似乎陷入某种死锁。

编辑 2

事实证明,我的大部分问题实际上是由于Hibernate的不当使用引起的。在对Hibernate事务的管理方式进行了一些重构之后,所有随机死锁和其他问题都消失了。

这似乎是对原始问题最优雅的解决方案,即如何在不中断处理的情况下使用虚拟项目进行响应,除非它在截止日期之前完成:

Uni<Item> u = processingPipeline();
return Uni.createFrom().emitter(emitter -> {
u.subscribe().with(emitter::complete);
vertx.setTimer(TimeUnit.SECONDS.toMillis(10), timerId -> {
emitter.complete(placeholderItem);
});
});

管道的取消是意料之中的,因为ifNoItem().after(duration)触发TimeoutException。因此,此异常作为失败传播,并且此特定点的上游被取消,这符合反应流语义。

recoverWith是一个故障恢复运算符,以及订阅此运算符后

的任何内容。你可能想要查看recoverWithUni,您将在其中提供Uni作为恢复,并且Uni将捕获在此特定超时点失败后可能重新订阅的管道的其余部分。

希望有帮助。

相关内容

  • 没有找到相关文章

最新更新