rxjava ObservableCreate with emitter vs rxrelay PublishRelay



我对反应式编程有点陌生,所以我想先解释我的用例和我做了什么,然后解释我的解决方案的问题,以及我认为答案是什么,但我不确定。

这是我的用例:我有一个android应用程序,它试图连接到远程服务器。当用户点击一个按钮时,我正试图通过反应式编程连接到这个服务器,并且我试图连接到serveral";连接器";所以我对它们进行迭代,直到其中一个连接器成功。

我就是这样做的:

Disposable disposable = currentConnector.connect(ip)
.observeOn(AndroidSchedulers.mainThread())
.flatMap(fallbackDefaultConnector())
.subscribeWith(connectionSubscriber);

这是connectionSubscriber:

new DisposableObserver<Boolean>() {
@Override
public void onNext(@NonNull Boolean item) {
Log.d(TAG, "Observable emits: " + item);
if (item) {
Log.d(TAG, "Connected!");
} else {
handleConnectionFailed();
}
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG, "On Error" + Log.getStackTraceString(e));
handleConnectionFailed();
}
@Override
public void onComplete() {
closeConnectionProcedure();
}
};

这是currentConnector.connect(ip)的一个示例(请注意超时(:

@Override
public Observable<Boolean> connect(String ipAddress) {
isConnected = false;
client.setHost(ipAddress);
Observable<Boolean> observable = client.connect
.timeout(CONNECTION_TIMEOUT, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io());
connectionEmitter = client.getConnectionEmitter();
return observable;
}

现在是棘手的部分->在这个连接器中,我保存客户端的emitter。为什么?因为客户端将我重定向到另一个配对片段。这就是client.connect在客户端的构造函数中的样子:

connect = new ObservableCreate<>(emitter -> {
connectionEmitter = emitter;
doConnect(host);
});

doConnect(host)在客户端内部成功时,我会发送这样的消息:

connectionEmitter.onNext(true);
connectionEmitter.onComplete();

它是有效的。当客户端连接时,一切都很好。我的问题是当连接在所有连接器上失败时,或者如果用户收到超时异常。如果失败,我会发送这样的失败:

connectionEmitter.onNext(false);
connectionEmitter.onComplete();

并且实际上用户看到一个";连接失败";

那么问题出在哪里呢如果失败,用户尝试再次连接,则不会发生任何事情我对此进行了研究,发现在observable发生onComplete事件或onError事件后,发射器不能发射更多事件。

所以我对此有两个想法——

  1. 也许我没有使用反应式编程,这对我的用例来说是正确的
  2. 也许我需要用rxrelayPublishRealy来实现它?如果是这样,我如何保存发射器并执行除accept(value)之外的更多逻辑

保存发射器的设计选择似乎是错误的。一旦发射器完成发射,就应该释放它进行GC。Rx就是这样设计的,如果你需要重做工作,那么你应该创建一个新的发射器或可观察的实例,然后你可以观察这个新实例。