如何保持观察对象直到onError() /取消订阅



我需要观察对象列表,并在列表中出现新对象时立即发出事件。就像客户端-服务器应用程序,客户端向列表中添加内容,服务器将新添加的内容发送给在服务器中注册的所有客户端。

到目前为止,我的可观察对象只发出一个项目,之后再没有其他项目。换句话说,当一个新对象被添加到列表中时,有一个事件,但是当第二个(或者第三、第四、第五……)对象被添加时,没有事件。

我需要它在列表中有新对象时保持发出项。

这是我的代码,其中MyList是我们说我自己实现的List(与我的问题无关)。我希望发出的事件是新添加到MyList的对象。

private void onClickMethod() {
    MyList myList = populateMyList();
    onNexAction = new Action1<MyList>() {
        @Override
        public void call(MyList myList) {
            System.out.println("call()");
            TheObject theObject = myList.getNext();
            System.out.println("New Event: " + theObject.getData());
        }
    };
    myObservable = Observable.create(new Observable.OnSubscribe<MyList>() {
        @Override
        public void call(Subscriber<? super MyList> t) {
            t.onNext(myList);
        }
    });
    myObservable.observeOn(Schedulers.newThread()).subscribeOn(Schedulers.io()).subscribe(onNexAction);
}

上面的代码将只产生第一个添加的对象。但是如果我在call()中放置一个while循环,它将像我想要的那样发出所有事件。但我觉得这不是我想要的。我觉得我更喜欢轮询而不是异步。

下面是完全相同的代码,但是使用了while循环:

private void onClickMethod() {
    MyList myList = populateMyList();
    onNexAction = new Action1<MyList>() {
        @Override
        public void call(MyList myList) {
            System.out.println("call()");
            while (myList.size() > 0) { // The while cycle
                TheObject theObject = myList.getNext();
                System.out.println("New Event: " + theObject.getData());
            }
        }
    };
    myObservable = Observable.create(new Observable.OnSubscribe<MyList>() {
        @Override
        public void call(Subscriber<? super MyList> t) {
            t.onNext(myList);
        }
    });
    myObservable.observeOn(Schedulers.newThread()).subscribeOn(Schedulers.io()).subscribe(onNexAction);
}

那么,我哪里做错了呢?我怎么能保持我的可观察对象发出一个新的事件,只要有一个新的对象在我的列表?

编辑:

根据Tassos Bassoukos的建议,我像这样应用distinct()运算符:

myObservable.observeOn(Schedulers.newThread()).subscribeOn(Schedulers.io()).distinct().subscribe(onNexAction);

但这不能解决我的问题。

这是PublishSubject的典型案例:

class MyList<T> {
    final PublishSubject<T> onAdded = PublishSubject.create();
    void add(T value) {
        // add internally
        onAdded.onNext(value);
    }
    Observable<T> onAdded() {
        return onAdded;
    }
}
MyList<Integer> list = populate();
Subscription s = list.onAdded()
    .subscribe(v -> System.out.println("Added: " + v));
list.add(1);  // prints "Added: 1"
list.add(2);  // prints "Added: 2"
list.add(3);  // prints "Added: 3"
s.unsubscribe(); // not interested anymore
list.add(4);  // doesn't print anything
list.add(5);  // doesn't print anything

根据您的代码,当用户单击(某物)时,将调用onClickMethod

这个方法将创建一个对象myList,您将构建一个将发出此对象的项。然后您将使用该对象。

事实上,你只会发出你的列表一次,这就是为什么你只通知一次(一次点击)

为了达到你想要的效果,一种可能的方法是当你添加一个项目时发出列表中的每个项目。

private void onClickMethod() {
     MyList myList = populateMyList();
     myObservable = Observable.create(subscriber -> {
               int currentSize = myList.size();
               while(myList.size() != currentSize) {
                    subscriber.onNext(myList.getNext());
                    currentSize = myList.size();
               }
               subscriber.onCompleted();
     });
   myObservable.observeOn(Schedulers.newThread())
               .subscribeOn(Schedulers.io())
               .subscribe(item -> System.out.println(item));

}

请注意,这段代码是不是性能,因为它将在myList.size()上循环,如果有人在循环代码中添加/删除项目,您可能会遇到问题。

更好的方法是在myList中添加新项时收到通知,然后发出这个新项。

首先,您没有从可观察对象中获得多个值的原因是,您传递给Observable.create的方法只调用一次,并且在第一次订阅时发生。如果你只是想把你的列表作为一个可观察对象,最简单的方法是让MyList类实现Iterable,你可以直接使用from代替

private void onClickMethod() {
 MyList myList = populateMyList();
 Observable.from(myList)
           .observeOn(Schedulers.newThread())
           .subscribe(item -> System.out.println(item));
}

相关内容

  • 没有找到相关文章

最新更新