如何在不启动新流程的情况下再次订阅Observable



我有可观测

Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            if (subscriber.isUnsubscribed()) {
                return;
            }
            for (int i = 0; i < 100; i++) {
               Thread.sleep(100);
              subscriber.onNext("Loading:"+i);
            }
            subscriber.onCompleted();
        }
    });

用户

 Subscriber<? super String> sub = new Subscriber<String>() {
                @Override
                public void onCompleted() {
                    pDialog.setMessage("Successfully Done!");
                    pDialog.cancel();
                }
            @Override
            public void onError(Throwable e) {
                pDialog.cancel();
            }
            @Override
            public void onNext(String string) {
                pDialog.setMessage(string);
            }
        };

点击按钮后,我这样做:

  compositeSubscription.add(observable.subscribeOn(Schedulers.newThread()).observeOn(
            AndroidSchedulers.mainThread()).subscribe(sub));

活动中:

@Override
protected void onStart() {
    super.onStart();
   compositeSubscription = new CompositeSubscription();
}
 @Override
    protected void onStop() {
        if (!compositeSubscription.isUnsubscribed()&&compositeSubscription.hasSubscriptions())
            compositeSubscription.unsubscribe();
        super.onStop();
    }

单击按钮后,将显示对话框并更新消息。

但如果我最小化应用程序并再次打开它,订阅会丢失,消息不会更新,但进程会继续执行。

如何在不启动新流程的情况下再次订阅Observable

1)您在这里处理的是冷可观察,这意味着每次订阅都会调用OnSubscribe.call()。您可以在此处使用publish(),它将Observable转换为ConnectableObservable或任何其他将转化为的运算符。

2) 如果您希望您的Observable在从后台返回后继续工作,那么onStart/onStop可能不是正确的生命周期回调。我会选择onCreate/onDestroy那里的

3) 取消订阅不会停止OnSubscribe.call()的执行。最简单的解决方案是在循环中检查isUnsubscribed()并相应地停止。

使用主题:

Observable<Long> observable = Observable.interval(100, TimeUnit.MILLISECONDS);
PublishSubject<Object> subject = PublishSubject.create();
observable.subscribe(subject);
Subscription subscription = subject.subscribe(o -> {
    System.out.println("o = " + o);
});
Thread.sleep(500);
subscription.unsubscribe();
Subscription subscription2 = subject.subscribe(o -> {
    System.out.println("continued = " + o);
});
Thread.sleep(500);
subscription.unsubscribe();

这将产生以下输出:

o = 0
o = 1
o = 2
o = 3
o = 4
continued = 5
continued = 6
continued = 7
continued = 8
continued = 9