我是RxJava的新手,但我正在将它集成到一个项目中,我正在努力帮助我学习它。我遇到了一个关于最佳实践的问题。
我有一个关于如何处理onError
阻止Observable
进程停止的问题。
设置如下:
我有一个userid的列表,每个我想做2个或更多的网络请求。如果对该用户id的任何网络请求失败,则不会更新该用户id,可以跳过该用户id。这不应该阻止其他用户标识被处理。我确实有一个解决方案,但它涉及嵌套订阅(参见第二个代码块)。我确实看到的一个问题是,如果每个呼叫都失败,即使在检测到某个阈值号码失败后,也没有办法短路并阻止剩余的网络资源。
有更好的方法吗?
传统代码:
List<String> results = new ArrayList<String>();
for (String userId : userIds) {
try {
String info = getInfo(userId); // can throw an GetInfoException
String otherInfo = getOtherInfo(userId); // can throw an GetOtherInfoException
results.add(info + ", " + otherInfo);
} catch (GetInfoException e) {
log.error(e);
} catch (GetOtherInfoException e) {
log.error(e);
}
}
问题:
伪代码:
userid -> network requests -> result
1 -> a, b -> onNext(1[a ,b])
2 -> a, onError -> onError
3 -> a, b -> onNext(3[a, b])
4 -> a, b -> onNext(4[a, b])
下面是一个userid列表的工作示例,每2个请求info。如果您运行它,您将看到它将失败(参见下面的源代码)
import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func1;
public class TestMergeDelayError {
public static Observable<String> getUserIds() {
return Observable.from(new String[]{"1", "2", "3", "4", "5", "6"});
}
public static Observable<String> getInfo(final String prefix, final String integer, final String errorNumber) {
Observable<String> observable = Observable.create(new OnSubscribeFunc<String>() {
public Subscription onSubscribe(Observer<? super String> t1) {
if (integer.contains(errorNumber)) {
t1.onError(new Exception());
} else {
t1.onNext(prefix + integer);
t1.onCompleted();
}
return Subscriptions.empty();
}
});
return observable;
}
public static void main(String[] args) {
Observable<String> userIdObservable = getUserIds();
Observable<String> t = userIdObservable.flatMap(new Func1<String, Observable<String>>() {
public Observable<String> call(final String t1) {
Observable<String> info1 = getInfo("1::: ", t1, "2");
Observable<String> info2 = getInfo("2::: ",t1, "3");
return Observable.mergeDelayError(info1, info2);
}
});
t.subscribe(new Action1<String>() {
public void call(String t1) {
System.out.println(t1);
}
}, new Action1<Throwable>() {
public void call(Throwable t1) {
t1.printStackTrace();
}
},
new Action0(){
public void call() {
System.out.println("onComplete");
}
});
}
}
输出:1::: 1
2::: 1
2::: 2
java.lang.Exception
at TestMergeDelayError$1.onSubscribe(TestMergeDelayError.java:32)
at rx.Observable.subscribe(Observable.java:241)
at rx.operators.OperationMergeDelayError$MergeDelayErrorObservable$ParentObserver.onNext(OperationMergeDelayError.java:266)
at rx.operators.OperationMergeDelayError$MergeDelayErrorObservable$ParentObserver.onNext(OperationMergeDelayError.java:210)
at rx.operators.OperationMergeDelayError$2.onSubscribe(OperationMergeDelayError.java:77)
at rx.Observable.subscribe(Observable.java:241)
at rx.operators.OperationMergeDelayError$MergeDelayErrorObservable.onSubscribe(OperationMergeDelayError.java:171)
at rx.operators.OperationMergeDelayError$1.onSubscribe(OperationMergeDelayError.java:64)
at rx.Observable.subscribe(Observable.java:241)
at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:164)
at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:116)
at rx.operators.OperationMap$MapObservable$1.onNext(OperationMap.java:105)
at rx.operators.SafeObserver.onNext(SafeObserver.java:102)
at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)
at rx.Observable.subscribe(Observable.java:241)
at rx.operators.OperationMap$MapObservable.onSubscribe(OperationMap.java:102)
at rx.operators.OperationMap$2.onSubscribe(OperationMap.java:76)
at rx.Observable.subscribe(Observable.java:241)
at rx.operators.OperationMerge$MergeObservable.onSubscribe(OperationMerge.java:106)
at rx.operators.OperationMerge$1.onSubscribe(OperationMerge.java:56)
at rx.Observable.subscribe(Observable.java:241)
at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)
at rx.Observable.subscribe(Observable.java:483)
嵌套订阅方案:
import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func1;
public class TestMergeDelayError {
public static Observable<String> getUserIds() {
return Observable.from(new String[]{"1", "2", "3", "4", "5", "6"});
}
public static Observable<String> getInfo(final String prefix, final String integer, final String errorNumber) {
Observable<String> observable = Observable.create(new OnSubscribeFunc<String>() {
public Subscription onSubscribe(Observer<? super String> t1) {
if (integer.contains(errorNumber)) {
t1.onError(new Exception());
} else {
t1.onNext(prefix + integer);
t1.onCompleted();
}
return Subscriptions.empty();
}
});
return observable;
}
public static void main(String[] args) {
Observable<String> userIdObservable = getUserIds();
userIdObservable.subscribe(new Action1<String>() {
public void call(String t1) {
Observable<String> info1 = getInfo("1::: ", t1, "2");
Observable<String> info2 = getInfo("2::: ", t1, "3");
Observable.merge(info1, info2).subscribe(new Action1<String>() {
public void call(String t1) {
System.out.println(t1);
}
}, new Action1<Throwable>() {
public void call(Throwable t1) {
t1.printStackTrace();
}
},
new Action0() {
public void call() {
System.out.println("onComplete");
}
});
}
});
}
}
输出:1::: 1
2::: 1
onComplete
java.lang.Exception
at TestMergeDelayError$1.onSubscribe(TestMergeDelayError.java:28)
at rx.Observable.subscribe(Observable.java:241)
at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:164)
at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:116)
at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)
at rx.Observable.subscribe(Observable.java:241)
at rx.operators.OperationMerge$MergeObservable.onSubscribe(OperationMerge.java:106)
at rx.operators.OperationMerge$1.onSubscribe(OperationMerge.java:56)
at rx.Observable.subscribe(Observable.java:241)
at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)
at rx.Observable.subscribe(Observable.java:483)
at TestMergeDelayError$2.call(TestMergeDelayError.java:47)
at TestMergeDelayError$2.call(TestMergeDelayError.java:42)
at rx.Observable$2.onNext(Observable.java:381)
at rx.operators.SafeObserver.onNext(SafeObserver.java:102)
at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)
at rx.Observable.subscribe(Observable.java:241)
at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)
at rx.Observable.subscribe(Observable.java:367)
at TestMergeDelayError.main(TestMergeDelayError.java:42)
1::: 3
java.lang.Exception
at TestMergeDelayError$1.onSubscribe(TestMergeDelayError.java:28)
at rx.Observable.subscribe(Observable.java:241)
at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:164)
at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:116)
at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)
at rx.Observable.subscribe(Observable.java:241)
at rx.operators.OperationMerge$MergeObservable.onSubscribe(OperationMerge.java:106)
at rx.operators.OperationMerge$1.onSubscribe(OperationMerge.java:56)
at rx.Observable.subscribe(Observable.java:241)
at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)
at rx.Observable.subscribe(Observable.java:483)
at TestMergeDelayError$2.call(TestMergeDelayError.java:47)
at TestMergeDelayError$2.call(TestMergeDelayError.java:42)
at rx.Observable$2.onNext(Observable.java:381)
at rx.operators.SafeObserver.onNext(SafeObserver.java:102)
at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)
at rx.Observable.subscribe(Observable.java:241)
at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)
at rx.Observable.subscribe(Observable.java:367)
at TestMergeDelayError.main(TestMergeDelayError.java:42)
1::: 4
2::: 4
onComplete
1::: 5
2::: 5
onComplete
1::: 6
2::: 6
onComplete
正如您所看到的,只有失败的userid停止了它们各自的处理,而其余的userid仍在处理。
只是寻求建议,看看这个解决方案是否有意义,如果不是最佳实践是什么。
谢谢,亚历克斯
既然你想忽略这个错误,你可以尝试onErrorResumeNext(Observable.<String>empty());
。例如,
Observable<String> info1 = getInfo("1::: ", t1, "2").onErrorResumeNext(Observable.<String>empty());
Observable<String> info2 = getInfo("2::: ", t1, "3").onErrorResumeNext(Observable.<String>empty());
return Observable.merge(info1, info2);
最佳实践是使用mergeDelayError()将多个可观察对象合并为一个,允许无错误的可观察对象在传播错误之前继续运行。
mergeDelayError
的行为与merge
相似。当其中一个被合并的observable终止时,会出现onError通知。如果merge发生了这种情况,被合并的Observable会立即发出onError通知并终止。另一方面,mergeDelayError会延迟报告错误,直到它给任何其他正在合并的不产生错误的可观察对象一个机会来完成发射它们的项,然后它将自己发射这些项,并且只有当所有其他合并的可观察对象都完成时才会以onError通知终止。
作为一个Rx新手,我也在寻找一个简单的答案来单独处理异常,并继续处理下一个事件,但找不到答案回答@Daniele Segato的问题。这里有一个你无法控制的解决方案:
上面的例子假设你可以控制可观察对象,也就是说,一种方法是使用mergeDelayError将错误延迟到最后,或者使用merge为每个事件单独返回一个已知的空事件Observable作为Observable。
如果是源事件错误,你可以使用lift创建另一个可观察对象,它基本上可以优雅地处理当前可观察对象的值。SimpleErrorEmitter类模拟了一个有时会失败的无界流。
Observable.create(new SimpleErrorEmitter())
// transform errors to write to error stream
.lift(new SuppressError<Integer>(System.err::println))
.doOnNext(System.out::println) // and everything else to console
.subscribe();
class SimpleErrorEmitter implements OnSubscribe<Integer> {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onError(new FooException());
subscriber.onNext(3);
subscriber.onNext(4);
subscriber.onCompleted();
}
class SuppressError<T> implements Operator<T, T> {
final Action1<Throwable> onError;
public SuppressError(Action1<Throwable> onError) {
this.onError = onError;
}
@Override
public Subscriber<? super T> call(Subscriber<? super T> t1) {
return new Subscriber<T>(t1) {
@Override
public void onNext(T t) {
t1.onNext(t);
}
@Override
public void onError(Throwable e) { // handle errors using a separate function
onError.call(e);
}
@Override
public void onCompleted() {
t1.onCompleted();
}
};
}
如果是订阅者处理错误,可以尝试/捕获并正常继续
Observable<Integer> justInts = justStrs.map((str) -> {
try {
return Integer.parseInt(str);
} catch (NumberFormatException e) {
return null;
}
});
我仍在尝试寻找一种简单的方法来重试或延迟尝试失败的事件并从下一个继续。
Observable<String> justStrs = Observable
.just("1", "2", "three", "4", "5") // or an unbounded stream
// both these retrying from beginning
// when you delay or retry, if they are of known exception type
.retryWhen(ex -> ex.flatMap(eachex -> {
// for example, if it is a socket or timeout type of exception, try delaying it or retrying it
if (eachex instanceof RuntimeException) {
return Observable.timer(1L, TimeUnit.MICROSECONDS, Schedulers.immediate());
}
return Observable.error(eachex);
}))
// or simply retry 2 times
.retry(2) // if it is the source problem, attempt retry
.doOnError((ex) -> System.err.println("On Error:" + ex));
参考:https://groups.google.com/forum/# !主题/rxjava trm2n6S4FSc
查看Observable.flatMap
的来源:
return merge(map(func));
如果希望处理所有可能的userid,可以使用修改版本的flatMap:
Observable.mergeDelayError(userIdObservable.map(userInfoFunc))
继续,如果你说:
如果用户id的任何网络请求失败,则该用户id不会更新,可以跳过
那就不要用:
return Observable.mergeDelayError(info1, info2);
因为这将导致info1和info2同时被请求,即使其中一个失败。
选择:
return Observable.merge(info1, info2);
当info1和info2被订阅到同一个线程时,它们将顺序运行,因此如果info1失败,将永远不会请求info2。由于info1和info2是有I/O限制的,我假设您希望并行运行它们:
getInfo("1::: ", t1, "2").subscribeOn(Schedulers.io());
getInfo("2::: ",t1, "3").subscribeOn(Schedulers.io());
这将显著加快您的处理速度
整个代码:
public class TestMergeDelayError {
public static Observable<String> getUserIds() {
return Observable.from(new String[]{"1", "2", "3", "4", "5", "6"});
}
public static Observable<String> getInfo(final String prefix, final String integer, final String errorNumber) {
return Observable.create(new OnSubscribeFunc<String>() {
public Subscription onSubscribe(Observer<? super String> t1) {
if (integer.contains(errorNumber)) {
t1.onError(new Exception());
} else {
t1.onNext(prefix + integer);
t1.onCompleted();
}
return Subscriptions.empty();
}
})
.subscribeOn(Schedulers.io());
}
public static void main(String[] args) {
Observable<String> userIdObservable = getUserIds();
Observable<String> t = Observable.mergeDelayError(userIdObservable.map(new Func1<String, Observable<String>>() {
public Observable<String> call(final String t1) {
Observable<String> info1 = getInfo("1::: ", t1, "2");
Observable<String> info2 = getInfo("2::: ",t1, "3");
return Observable.merge(info1, info2);
}
}));
//rest is the same
}
}