使用 share():将最后一个值发布给新订阅者


@Override
public void onPause() {
super.onPause();
compositeDisposable.clear();
}
@Override
public void onResume() {
super.onResume();
Flowable<MyData> distanceFlowable = myDataProcessor.hide().onBackpressureLatest()
.distinctUntilChanged()
.share();
compositeDisposable.add(distanceFlowable)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {Log.w("observer 1", data.value)});
compositeDisposable.add(distanceFlowable)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {Log.w("observer 2", data.value)});
}

当 distanceFlowable 不断获得新的更改时,代码运行良好。但是,在距离上只有一个帖子的情况下,只有观察者 1会收到通知。

它的行为如下所示:

  1. 发布在距离上可流动
  2. 观察者 1 订阅
  3. 日志猫打印"观察者1:133">
  4. 观察者 2 订阅

我希望它的行为像:

  1. 发布在距离上可流动
  2. 观察者 1 订阅
  3. 日志猫打印"观察者1:133">
  4. 观察者 2 订阅
  5. 日志猫打印"观察者2:133">

我尝试将ConnectedFlowablepublish()一起使用,然后在订阅观察者 1观察者 2connect()流对象。但是,即使在清除compositeDisposable并且没有人听之后,它仍然在 flowable 上发布。

解决此问题的首选方法是什么?

我错过了flowable.connect()返回一个可以添加到我的compositeDisposableDisposable,这样它最终会在 onPause 中被清除。

只需使用.compose(ReplayingShare.instance());https://github.com/JakeWharton/RxReplayingShare

这应该行得通。

相关内容

最新更新