RxJava -在完成之前触发另一个Observable/Completable,只在它之后才完成



我对RxJava并不完全陌生,但我被一个看似简单的任务所阻碍。

我有一个数据源,它暴露了一个响应式API,所有我想做的就是获取一些数据,返回它,并在没有其他东西要发出时自动关闭连接。

下面是我的代码:
public Observable<Object> execute(String query) {
    Single<RxConnection> rxConnection = getRxDB().getConnection();
    return rxConnection.flatMapObservable(conn -> {
        Observable<Object> rxResult = conn.query(query);
        return rxResult.doOnCompleted(() -> {
            conn.close(); // THIS DOES NOT WORK. I would like to close the connection and to wait without blocking.
        });
    });
}

conn.query()和conn.close()在不同的scheduler中异步执行。这段代码不能工作,因为conn.close()返回一个没有订阅者的Completable。另外,如果我在doOnCompleted方法本身中手动订阅,rxResult Observable在连接关闭之前就完成了。

我希望"execute(String query)"方法返回一个Observable:-发出conn.query()调用获取的所有项-当没有更多的元素可以释放时,它会触发conn.close()-只在conn.close() Completable之后完成

谢谢。

如何:

Observable<Object> rxResult = conn.query(query)
.concatWith(conn.close().toObservable())
.onErrorResumeNext(e -> 
    conn.close().toObservable().concatWith(Observable.error(e)));

使用Observable.using:

管理资源闭包
Observable<T> obs = 
    Observable.using(
        resourceFactory,
        observableFactory,
        disposeAction)

此创建方法确保由resourceFactory创建的资源在终止(完成或错误)或取消订阅时被处置。

所以,你想只在可观察对象被订阅时执行查询:

private Observable<Object> executeDeferred(String query) {
   // your original execute() method here, including doOnComplete
}

public Observable<Object> execute(String query) {
  return Observable.defer(() -> executeDeferred(query));
}

这将确保连接,查询和释放只发生在可观察对象获得订阅时,而不是在设置期间。

编辑:您还需要一个doOnUnsubscribe/doOnTerminate,以关闭连接时过早退订发生

相关内容

  • 没有找到相关文章

最新更新