如何处理通量滤波器When()的负情况



如果filterWhen((为false,如何进行异步非阻塞调用?

我有一堆来自SQS的物品。在某些情况下,我需要过滤掉它们,但过滤后,我仍然需要向第三方api发送一个事件,其中包含项目详细信息。

代码如下:

getItems()
.map(this::doSomething)
.flatMap(item -> callService1(item).thenReturn(item))
.filterWhen(item -> check(item))
.flatMap(item -> callService2(item))

我想要实现的是:

getItems()
.map(this::doSomething)
.flatMap(item -> callService1(item).thenReturn(item))
.filterWhen(item -> check(item))
.ifFalse(item -> 3rdPartyApi(item))
.flatMap(item -> callService2(item))

有没有任何运算符可以用来用ifFalse((或使用其他方法来替换行?

如果是简单的if,我们可能会使用flatMap并在其中进行检查。但是它的filterWhen((和无法更改。

您必须将该侧API调用附加到用作filterWhen〃的发布服务器;条件";(建立在check(...)调用之上(当check发布者发出的结果为false:时

Mono<Boolean> check(String s) {
return Mono.fromCallable(() -> s.contains("example"));
}
Mono<Long> sideEffectOnFiltered(String s) {
return Mono.delay(Duration.ofSeconds(2))
.doOnNext(b -> System.out.println("did side effect on " + s));
}
@Test
void demonstrateIfFalse() {
Flux.just("example", "value", "example2")
.filterWhen(v -> check(v)
.flatMap(isExample -> isExample ? Mono.just(true) :
sideEffectOnFiltered(v).thenReturn(false)
)
)
.log()
.blockLast();
}

该输出:

11:03:45.640 [main] INFO  reactor.Flux.FilterWhen.1 - onSubscribe(FluxFilterWhen.FluxFilterWhenSubscriber)
11:03:45.646 [main] INFO  reactor.Flux.FilterWhen.1 - request(unbounded)
11:03:45.704 [main] INFO  reactor.Flux.FilterWhen.1 - onNext(example)
did side effect on value
11:03:47.726 [parallel-1] INFO  reactor.Flux.FilterWhen.1 - onNext(example2)
11:03:47.730 [parallel-1] INFO  reactor.Flux.FilterWhen.1 - onComplete()

最新更新