是否可以在Mono/Flux中创建动态过滤器/谓词



该应用程序是一个简单的处理程序——读取数据、过滤数据、处理和写入过滤后的数据。

以下是一个运行时没有任何问题的简单代码:

void run() {
Flux.interval(Duration.ofMillis(200))
.filter(value -> getPredicate().test(value))
.flatMap(this::writeData)
.subscribe();
}
private Predicate<Long> getPredicate() {
return value -> value % 2 == 0;
}

是否可以有动态谓词,该谓词将通过定期请求从远程web服务中检索?如果可能-如何使用Mono<谓词>interface.filter((并保持其不阻塞

例如,将getPredicate((替换为以下内容:

private Mono<Predicate<Long>> getPredicateFromRemoteServer() {
return webClient.get()
.uri("/filters/1")
.retrieve()
.bodyToMono(Filter.class)
.map(this::mapToPredicate)
.cache(v -> Duration.ofMinutes(10), ex -> Duration.ZERO, () -> Duration.ZERO);
}
private Predicate<Long> mapToPredicate(Filter filter) {
// here will be converting filter object into predicate
return value -> value > 5;
}

理想情况下,我希望避免缓存(Duration.Minutes(10((,因为过滤器可以每分钟或每天更新。。。一旦过滤器更新,我的服务就会收到通知,但我没有找到从外部使缓存无效的方法,这就是为什么Duration.Minutes(10(用于一些近似的无效。

好吧,也许您可以用不同的方式编写管道。不希望每次通过调用getPredicateFromRemoteServer()来处理流中的项时都返回一个新的Predicate,而是可以将函数本身作为谓词。从流中传递您正在处理的值,使其返回带有答案的Mono<Boolean>,并在管道中的filterWhen管道中使用该值。

例如,有点像这样:

private Mono<Boolean> isWithinThreshold(int value) {
return webClient.get()
.uri("/filters/1")
.retrieve()
.bodyToMono(Filter.class)
.map(filter -> filter.threshold <= value)
.cache(v -> Duration.ofMinutes(10), ex -> Duration.ZERO, () -> Duration.ZERO);
}

然后在你的主管道中,你可以做:

Flux.interval(Duration.ofMillis(200))
.filterWhen(value -> isWithinThreshold(value))
.flatMap(this::writeData)
.subscribe();
}

最新更新