我对RxJava并不完全陌生,但我被一个看似简单的任务所阻碍。
我有一个数据源,它暴露了一个响应式API,所有我想做的就是获取一些数据,返回它,并在没有其他东西要发出时自动关闭连接。
下面是我的代码:public Observable<Object> execute(String query) {
Single<RxConnection> rxConnection = getRxDB().getConnection();
return rxConnection.flatMapObservable(conn -> {
Observable<Object> rxResult = conn.query(query);
return rxResult.doOnCompleted(() -> {
conn.close(); // THIS DOES NOT WORK. I would like to close the connection and to wait without blocking.
});
});
}
conn.query()和conn.close()在不同的scheduler中异步执行。这段代码不能工作,因为conn.close()返回一个没有订阅者的Completable。另外,如果我在doOnCompleted方法本身中手动订阅,rxResult Observable在连接关闭之前就完成了。
我希望"execute(String query)"方法返回一个Observable:-发出conn.query()调用获取的所有项-当没有更多的元素可以释放时,它会触发conn.close()-只在conn.close() Completable之后完成
谢谢。
如何:
Observable<Object> rxResult = conn.query(query)
.concatWith(conn.close().toObservable())
.onErrorResumeNext(e ->
conn.close().toObservable().concatWith(Observable.error(e)));
使用Observable.using
:
Observable<T> obs =
Observable.using(
resourceFactory,
observableFactory,
disposeAction)
此创建方法确保由resourceFactory
创建的资源在终止(完成或错误)或取消订阅时被处置。
所以,你想只在可观察对象被订阅时执行查询:
private Observable<Object> executeDeferred(String query) {
// your original execute() method here, including doOnComplete
}
public Observable<Object> execute(String query) {
return Observable.defer(() -> executeDeferred(query));
}
这将确保连接,查询和释放只发生在可观察对象获得订阅时,而不是在设置期间。
编辑:您还需要一个doOnUnsubscribe/doOnTerminate,以关闭连接时过早退订发生