Mutiny -将完成传播到父多轮询



我正在使用Mutiny编写一个小轮询机制,我学习库的一部分,当发现结果时,我有点困在取消轮询。我试着使用tick(),我得出的结果看起来像

Multi.createFrom().ticks().every(Duration.ofSeconds(5))
.onItem().transformToMultiAndMerge(tick -> {
System.out.println("Tick:" + tick);
return Multi.createFrom()
.<Transaction>emitter(
emitter -> {
service.getTransactions().toMulti()
.onItem().transformToMultiAndMerge(
transactions -> Multi.createFrom().iterable(transactions))
.subscribe().with(transaction -> {
if (!verification.isOngoing()) {
emitter.fail(new TransactionVerificationException());
} else {
boolean transactionFound = transaction.getAmount().stream().anyMatch(
amount -> amount.getQuantity()
.equals("test"));
if (transactionFound) {
emitter.emit(transaction);
emitter.complete();
} 
}
});
});
})
.subscribe()
.with(transaction -> log.info(transaction),
x -> x.printStackTrace());

这里的问题是,Multi from ticks()永远运行,我认为取消它的唯一方法是以某种方式传播发射器已经完成。这里的情况是我想要发出,并且只在满足某些条件时才处理。

你的方法几乎是正确的,

  • 不需要从现有的Multi(或转换后的Uni)中创建自定义MultiEmitter,因为您可以在源service#getTransaction结果之上利用不同的Multi操作符
  • 您错过了EmptyMulti源,它将自动完成下游订阅者链,您可以使用它来表示缺乏有效项目(即Transaction)
  • 您需要选择第一个有效的项目(非null),然后将您的Multi转换为Uni,这将导致上游订阅被自动取消,一旦收到项目

下面是流管道的样子:

Multi.createFrom()
.ticks()
.every(Duration.ofSeconds(5))
.onItem()
// flat map the ticks to the `service#getTransactions` result
.transformToMultiAndMerge(tick -> service.getTransactions()
.toMulti()
.onItem()
// flatten Collection<Transaction> to Multi<Transaction>
.transformToIterable(Function.identity())
.onItem()
.transformToMultiAndMerge(transaction -> {
if (!verification.isOngoing()) {
return Multi.createFrom().failure(new TransactionVerificationException());
} else {
boolean transactionFound = transaction.getAmount()
.stream()
.anyMatch(amount -> amount.getQuantity().equals("test"));
if (transactionFound) {
return Multi.createFrom().item(transaction);
} else {
return Multi.createFrom().empty();
}
}
})
)
.select()
.first(Objects::nonNull)
.toUni()
.subscribe()
.with(transaction -> log.info(transaction), x -> x.printStackTrace());

最新更新