有我的方法:
public void openWorkshift(WorkshiftSettings workshiftSettings, Subscriber<WorkshiftSettings> subscriber) {
api.openWorkshift(workshiftSettings)
.compose(RxOperatorsHelpers.additionalStacktrace())
.doOnSubscribe(() -> actionsSystem.registerAction(...).await()) // <-
.doOnUnsubscribe(() -> actionsSystem.unregisterAction(...).await()); // <-
.subscribeOn(ioScheduler)
.observeOn(uiScheduler)
.doOnError(this::handleError)
.subscribe(subscriber);
}
其中ActionsSystem.registerAction(...)
/ActionsSystem.unregisterActions(...)
看起来像:
public Completable registerAction(OperatorAction action) {
return Completable.fromAction(() -> actions.add(action));
}
public Completable unregisterAction(OperatorAction action) {
return Completable.fromAction(() -> actions.remove(action));
}
正如您所看到的,我在源Observable的流中使用.await()
来执行Completable
。这感觉像是错误的解决方案我怎样才能做得更优雅
由于Completable
执行琐碎的操作,您可以简单地将它们的代码内联到doOnSubscribe
和doOnUnsubscribe
:中
.doOnSubscribe(() -> actions.add(action))
.doOnUnsubscribe(() -> actions.remove(action))
您可以通过从可完成的andThen
开始避免doOnSubscribe
——Observable
序列的其余部分:
actionsSyste.registerAction(...)
.andThen(api.openWorkshift(workshiftSettings)
.compose(RxOperatorsHelpers.additionalStacktrace())
.doOnUnsubscribe(() -> actionsSyste.unregisterAction(...).await())
.subscribeOn(ioScheduler)
.observeOn(uiScheduler)
.doOnError(this::handleError)
)
.subscribe(...)
目前,当下游取消订阅时,没有办法执行Completable
,当序列可能正常或异常终止时,也没有简单的方法执行它。
您可以使用Observable.defer
。该运营商延迟创建可观察对象,直到订阅:
Observable observable = Observable.defer(() -> {
actions.add(action);
api.openWorkshift(workshiftSettings)
}).compose(RxOperatorsHelpers.additionalStacktrace())
.subscribeOn(ioScheduler)
.observeOn(uiScheduler)
.doOnError(this::handleError);
然后使用CCD_ 13。根据文件Subscription.create()
:
创建并返回一个订阅,该订阅在取消订阅时调用给定的Action0。
所以基本上你需要做:
Subscription subscription = Subscriptions.create(new Action0() {
@Override
public void call() {
actionsSyste.unregisterAction(...);
}
});
subscriber.add(subscription);
observable.subscribe(subscriber);