RxJava:在每个订阅者处理完资源后关闭资源



我是RxJava的新手,我正在努力弄清楚如何正确关闭资源,特别是在处理多个订阅者时。

我有一个Observable<T>,其中T是一些Closeable资源(例如,Android数据库Cursor

我可以在可观察对象上有多个订阅者。我想在每个订阅者完成对资源的处理后对其进行close()。换句话说,在新资源交付/发出后关闭旧资源,并在最后一个订阅者退订时关闭最后一个资源。

我试着用一个自定义的操作符,我叫AutoCloseOperator,它几乎工作,但不完全正确。例如,我仍然存在竞争条件和/或泄漏,例如,资源未被关闭。

在RxJava中正确的方法是什么?

假设我有这样的代码:

final AutoCloseOperator<MyResource> autoClose = new AutoCloseOperator<MyResource>();
Subject<MyResource, MyResource> subject = PublishSubject.create();
Observable<MyResource> o = subject.lift(autoClose);
Subscription s1 = o.subscribe(new Action1<MyResource>() {
    public void call(MyResource myObj) {
        System.out.println("s1 handling " + myObj);
    }
});
subject.onNext(new MyResource(1));
subject.onNext(new MyResource(2)); // This should close Resource #1 after Resource #2 is delivered
Subscription s2 = o.subscribe(new Action1<MyResource>() {
    public void call(MyResource myObj) {
        System.out.println("s2 handling " + myObj);
    }
});
subject.onNext(new MyResource(3));
subject.onNext(new MyResource(4));
s1.unsubscribe();
subject.onNext(new MyResource(5));
subject.onNext(new MyResource(6));
s2.unsubscribe();
subject.onNext(new MyResource(7));
subject.onNext(new MyResource(8));

那么我希望有以下行为:

s1 handling Resource #1
s1 handling Resource #2
Closing Resource #1
s1 handling Resource #3
Closing Resource #2
s2 handling Resource #3
s1 handling Resource #4
s2 handling Resource #4
Closing Resource #3
s2 handling Resource #5
Closing Resource #4
s2 handling Resource #6
Closing Resource #5
Closing Resource #6
Closing Resource #7
Closing Resource #8

注意:我没有在我的实际代码中使用PublishSubject,我在这里使用它只是为了说明目的,我使用Observable.create,每次数据库表更新时都会发出Cursor

概括一下这个问题:我可以只使用doOnNextdoOnUnsubscribe来关闭旧项目,但这没有考虑到这些事件将多次发生(对于每个订阅者),并且我只想在所有订阅者都收到新项目时关闭资源。

是使用lift()的自定义操作符,还是有一些现有的操作符可以帮助实现这一点?

我已经把我的问题减少到GitHub上的一个小命令行应用程序。谢谢你的关注!

Observable.using()就是你需要的。

如果你的t类型是T,它有一个.close()方法,你想从t(你的光标)中提取一些东西,比如Observable<R>,那么下面是怎么做的:

Func0<T> resourceFactory = () -> t;
Func1<T, Observable<R>> observableFactory = x -> ...
Action1<T> disposeAction = x -> x.close();
Observable<R> results = Observable.using(resourceFactory, observableFactory, disposeAction);

你提到你有Observable<T>。要从所有t中获得所有r,然后使用上面的代码,如下所示:

Observable<T> source = ...
Observable<R> results = 
    source.flatMap(t -> {
        Func0<T> resourceFactory = () -> t;
        Func1<T, Observable<R>> observableFactory = x -> ...
        Action1<T> disposeAction = x -> x.close();
        return Observable.using(resourceFactory, observableFactory, disposeAction);});

相关内容

  • 没有找到相关文章

最新更新