我是RxJava的新手,我正在努力弄清楚如何正确关闭资源,特别是在处理多个订阅者时。
我有一个Observable<T>
,其中T
是一些Closeable
资源(例如,Android数据库Cursor
。
我可以在可观察对象上有多个订阅者。我想在每个订阅者完成对资源的处理后对其进行close()
。换句话说,在新资源交付/发出后关闭旧资源,并在最后一个订阅者退订时关闭最后一个资源。
我试着用一个自定义的操作符,我叫AutoCloseOperator
,它几乎工作,但不完全正确。例如,我仍然存在竞争条件和/或泄漏,例如,资源未被关闭。
在RxJava中正确的方法是什么?
假设我有这样的代码:
final AutoCloseOperator<MyResource> autoClose = new AutoCloseOperator<MyResource>();
Subject<MyResource, MyResource> subject = PublishSubject.create();
Observable<MyResource> o = subject.lift(autoClose);
Subscription s1 = o.subscribe(new Action1<MyResource>() {
public void call(MyResource myObj) {
System.out.println("s1 handling " + myObj);
}
});
subject.onNext(new MyResource(1));
subject.onNext(new MyResource(2)); // This should close Resource #1 after Resource #2 is delivered
Subscription s2 = o.subscribe(new Action1<MyResource>() {
public void call(MyResource myObj) {
System.out.println("s2 handling " + myObj);
}
});
subject.onNext(new MyResource(3));
subject.onNext(new MyResource(4));
s1.unsubscribe();
subject.onNext(new MyResource(5));
subject.onNext(new MyResource(6));
s2.unsubscribe();
subject.onNext(new MyResource(7));
subject.onNext(new MyResource(8));
那么我希望有以下行为:
s1 handling Resource #1
s1 handling Resource #2
Closing Resource #1
s1 handling Resource #3
Closing Resource #2
s2 handling Resource #3
s1 handling Resource #4
s2 handling Resource #4
Closing Resource #3
s2 handling Resource #5
Closing Resource #4
s2 handling Resource #6
Closing Resource #5
Closing Resource #6
Closing Resource #7
Closing Resource #8
注意:我没有在我的实际代码中使用PublishSubject
,我在这里使用它只是为了说明目的,我使用Observable.create
,每次数据库表更新时都会发出Cursor
…
概括一下这个问题:我可以只使用doOnNext
和doOnUnsubscribe
来关闭旧项目,但这没有考虑到这些事件将多次发生(对于每个订阅者),并且我只想在所有订阅者都收到新项目时关闭资源。
是使用lift()
的自定义操作符,还是有一些现有的操作符可以帮助实现这一点?
我已经把我的问题减少到GitHub上的一个小命令行应用程序。谢谢你的关注!
Observable.using()
就是你需要的。
如果你的t
类型是T
,它有一个.close()
方法,你想从t
(你的光标)中提取一些东西,比如Observable<R>
,那么下面是怎么做的:
Func0<T> resourceFactory = () -> t;
Func1<T, Observable<R>> observableFactory = x -> ...
Action1<T> disposeAction = x -> x.close();
Observable<R> results = Observable.using(resourceFactory, observableFactory, disposeAction);
你提到你有Observable<T>
。要从所有t中获得所有r,然后使用上面的代码,如下所示:
Observable<T> source = ...
Observable<R> results =
source.flatMap(t -> {
Func0<T> resourceFactory = () -> t;
Func1<T, Observable<R>> observableFactory = x -> ...
Action1<T> disposeAction = x -> x.close();
return Observable.using(resourceFactory, observableFactory, disposeAction);});