如何组合不同的单声道,并使用与错误处理的组合结果?



我有一个场景,我需要使用不同的单声道,它可以返回我的错误,并设置映射值为空,如果返回错误。例:

Mono<A> a=Some api call;
Mono<A> b=Some api giving error;
Mono<A> c=Some api call;

现在我想将结果响应设置为map

Map<String,A> m=new HashMap<>();
m.put("a",a);
m.put("b",null);
m.put("c",c);

有谁能告诉我如何以反应性的非阻塞方式完成所有这些吗?我尝试压缩,但它不会执行,如果任何api返回错误或如果我使用onErrorReturn(null)。

Thanks in advance

要解决您的问题,您将不得不使用一些技巧。问题是:

  • 给出空单声道或以错误结束的单声道取消压缩操作(来源:mono #zip javadoc)
  • 响应流不允许空值(来源:响应流规范,表2:订户,项目号13)

另外,请注意,在散列映射中添加空值与取消与该键关联的任何先前值相同(在更新现有映射的情况下,这一点很重要)。

现在,为了绕过您的问题,您可以添加一个抽象层,并将您的值包装在域对象中。可以有一个对象表示查询,另一个对象表示有效结果,最后一个对象将反映错误。有了这些,你就可以设计出非空值的发布者。这是函数式编程中经常使用的技术:常见错误是(一个可能的)结果值的一部分。

现在,让我们看看从多个Monos创建新Map的示例:

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Map;
public class BypassMonoError {
/**
* An object identified by a key. It serves to track which key to associate to computed values
* @param <K> Type of the key
*/
static class Identified<K> {
protected final K id;
Identified(K id) {
this.id = id;
}
public K getId() {
return id;
}
}
/**
* Describe the result value of an operation, along with the key associated to it.
*
* @param <K> Type of the identifier of the result
* @param <V> Value type
*/
static abstract class Result<K, V> extends Identified<K> {
Result(K id) {
super(id);
}
/**
*
* @return Computed value on success, or null if the operation has failed. Note that here, we cannot tell from
* a success returning a null value or an error
*/
abstract V getOrNull();
}
static final class Success<K, V> extends Result<K, V> {
private final V value;
Success(K id, V value) {
super(id);
this.value = value;
}
@Override
V getOrNull() {
return value;
}
}
static final class Error<K, V> extends Result<K, V> {
private final Exception error;
Error(K id, Exception error) {
super(id);
this.error = error;
}
@Override
V getOrNull() {
return null;
}
public Exception getError() {
return error;
}
}
/**
* A request that can asynchronously generate a result for the associated identifier.
*/
static class Query<K, V> extends Identified<K> {
private final Mono<V> worker;
Query(K id, Mono<V> worker) {
super(id);
this.worker = worker;
}
/**
* @return The operator that computes the result value. Note that any error is silently wrapped in an
* {@link Error empty result with error metadata}.
*/
public Mono<Result<K, V>> runCatching() {
return worker.<Result<K, V>>map(success -> new Success<>(id, success))
.onErrorResume(Exception.class, error -> Mono.just(new Error<K, V>(id, error)));
}
}
public static void main(String[] args) {
final Flux<Query<String, String>> queries = Flux.just(
new Query("a", Mono.just("A")),
new Query("b", Mono.error(new Exception("B"))),
new Query("c", Mono.delay(Duration.ofSeconds(1)).map(v -> "C"))
);
final Flux<Result<String, String>> results = queries.flatMap(query -> query.runCatching());
final Map<String, String> myMap = results.collectMap(Result::getId, Result::getOrNull)
.block();
for (Map.Entry<String, String> entry : myMap.entrySet()) {
System.out.printf("%s -> %s%n", entry.getKey(), entry.getValue());
}
}
}

注意:在上面的例子中,我们静默地忽略任何发生的错误。但是,在使用flux时,您可以测试结果是否为错误,如果是错误,您可以自由地设计自己的错误管理(log, fail-first, send in另一个flux等)。

这个输出:

a -> A
b -> null
c -> C

最新更新