背景
我有一个使用RxJava从不同位置基于列表获取数据的过程。每个项都是通过不同的方法获得的(都返回Observables。)。由于要获得N个项,所以要使用的逻辑运算符是带迭代器的zip
。
问题
下面的代码按预期工作,但我需要一个try-catch
块来捕获由getBigFoo()
引发的异常(返回FooNotFoundException
),这似乎是"错误的"。其他与错误相关的运算符(如onErrorResumeNext()
和onErrorReturn()
)不包括这一点吗?
private Observable<Bar> processFoos(List<Foo> foos) {
List<Observable<? extends IBar>> observablesToZip = new ArrayList<>();
for(Foo foo : foos) {
switch (foo.getType()) {
case BIG_FOO :
try {
observablesToZip.add(getBigFoo(foo.getId()));
} catch (Exception exception) {
//do nothing - but this seems wrong
}
}
}
return Observable.zip(observablesToZip, results -> mergeFoosIntoBar(results));
}
尝试
下面的尝试似乎没有捕捉到生成的异常。我不明白为什么从技术上讲,序列中没有上游或下游项目,所以Observable.empty()
应该工作?
private Observable<Bar> processFoos(List<Foo> foos) {
List<Observable<? extends IBar>> observablesToZip = new ArrayList<>();
for(Foo foo : foos) {
switch (foo.getType()) {
case BIG_FOO :
observablesToZip.add(getBigFoo(foo.getId().onErrorResumeNext(Observable.empty()));
}
}
return Observable.zip(observablesToZip, results -> mergeFoosIntoBar(results));
}
您可能需要使用defer
。getBigFoo
不应抛出异常,而应返回错误的Observable
。因此defer
可能会帮助您修复它:
Observable<IBar> obs = Observable.defer(() -> {
try {
return getBigFoo(foo.getId());
} catch (Exception ex) {
return Observable.error(ex);
}
});
observablestoZip.add(obs);
@dwursteisen给出了正确的答案,但并不完全正确。
我的问题是我抛出了一个新的FooNotFoundException:
throw new FooNotFoundException()
但我需要做的是:
return Observable.error(new FooNotFoundException());
然后在我的Zip功能中:
observablesToZip.add(getBigFoo(foo.getId())).onExceptionResumeNext(Observable.just(null);
使用上述组合意味着,当单个Observable被解析并可能引发错误时,整个序列不会中止并返回错误。
你能让getBigFoo(foo.getId())抛出RuntimeException而不是Exception吗?。必须捕获管道上的所有异常,但不能捕获runtimeException。
看看这个愚蠢的例子
/**
* Here is a silly example how runtimeExceptions are not needed
*/
@Test
public void observableRuntimeException() {
Integer[] numbers = {0, 1, 2, 3, 4, 5};
Observable.from(numbers)
.doOnNext(number -> throwRuntimeException())
.doOnError(t -> System.out.println("Expecting illegal argument exception:" + t.getMessage()))
.subscribe();
}
private void throwRuntimeException() {
throw new RuntimeException();
}
你可以在这里看到更多的例子https://github.com/politrons/reactive