用RxJava替换侦听器



目前我正在研究向RxJava的迁移,并决定我的一个管理器(accountManager)将是一个有趣的开始。当前,Manager有一个侦听器列表,并在帐户更新或出现问题时发送相应的更新。

private List<WeakReference<ProfileChangeListener>> mListeners = new ArrayList<>();
public interface ProfileChangeListener {
    void onProfileUpdated(Account account);
    void onProfileFailed(Exception e);
}

我的Rx解包含一个Subject

private SerializedSubject<Account, Account> mManagerSubject = new SerializedSubject<>(BehaviorSubject.<Account>create());
public Observable<Account> observe() {
    return mManagerSubject;
}

,然后当更新发生时,我调用以下其中一个:

private void onProfileUpdated(Account account) {
    mManagerSubject.onNext(account);
}
private void onProfileFailed(final Exception e) {
    mManagerSubject.onError(e);
}

问题是,一旦onError被调用,任何人通过observe收听将永远不会从onNext获得另一个更新。

我仍然希望订阅者接收onError,以便他们可以处理错误状态,但在稍后的时间onNext仍然可以用更新的帐户调用,我仍然希望订阅者处理更新的帐户。

我尝试过使用onErrorResumeNext, onErrorReturnonExceptionResumeNext的解决方案,但它们都没有传播onError

TLDR:我如何在onError被调用后保持订阅者订阅,同时仍然传播onError?

Rx中的"Errors"一开始可能有点难以理解,因为它们的含义与大多数人所期望的略有不同。

错误处理文档(重点是我的):

一个可观察对象通常不会抛出异常。相反,它会用onError通知终止Observable序列,从而通知所有观察者发生了不可恢复的错误

Observable遇到不可恢复的错误时,应该使用

onError()—即当Observable无法继续发出项时。当您订阅时,您可能会使用onErrorResumeNext之类的内容来尝试一些恢复操作,但这应该是源Observable的结尾。

相反,您可能希望调整Observable发出的内容以支持发出错误项,或者包含指示遇到错误的标志。

如果您的错误确实是不可恢复的,那么您可能需要重新审视您的恢复策略并尝试稍微不同的方法。

相关内容

  • 没有找到相关文章

最新更新