如何在订阅/取消订阅时执行Completable



有我的方法:

public void openWorkshift(WorkshiftSettings workshiftSettings, Subscriber<WorkshiftSettings> subscriber) {
    api.openWorkshift(workshiftSettings)
            .compose(RxOperatorsHelpers.additionalStacktrace())
            .doOnSubscribe(() -> actionsSystem.registerAction(...).await()) // <-
            .doOnUnsubscribe(() -> actionsSystem.unregisterAction(...).await()); // <-
            .subscribeOn(ioScheduler)
            .observeOn(uiScheduler)
            .doOnError(this::handleError)
            .subscribe(subscriber);
}

其中ActionsSystem.registerAction(...)/ActionsSystem.unregisterActions(...)看起来像:

public Completable registerAction(OperatorAction action) {
    return Completable.fromAction(() -> actions.add(action));
}
public Completable unregisterAction(OperatorAction action) {
    return Completable.fromAction(() -> actions.remove(action));
}

正如您所看到的,我在源Observable的流中使用.await()来执行Completable。这感觉像是错误的解决方案我怎样才能做得更优雅

由于Completable执行琐碎的操作,您可以简单地将它们的代码内联到doOnSubscribedoOnUnsubscribe:中

        .doOnSubscribe(() -> actions.add(action))
        .doOnUnsubscribe(() -> actions.remove(action))

您可以通过从可完成的andThen开始避免doOnSubscribe——Observable序列的其余部分:

actionsSyste.registerAction(...)
.andThen(api.openWorkshift(workshiftSettings)
        .compose(RxOperatorsHelpers.additionalStacktrace())
        .doOnUnsubscribe(() -> actionsSyste.unregisterAction(...).await())
        .subscribeOn(ioScheduler)
        .observeOn(uiScheduler)
        .doOnError(this::handleError)
 )
 .subscribe(...)

目前,当下游取消订阅时,没有办法执行Completable,当序列可能正常或异常终止时,也没有简单的方法执行它。

您可以使用Observable.defer。该运营商延迟创建可观察对象,直到订阅:

Observable observable = Observable.defer(() -> {
        actions.add(action);
        api.openWorkshift(workshiftSettings)
}).compose(RxOperatorsHelpers.additionalStacktrace())
    .subscribeOn(ioScheduler)
    .observeOn(uiScheduler)
    .doOnError(this::handleError);

然后使用CCD_ 13。根据文件Subscription.create():

创建并返回一个订阅,该订阅在取消订阅时调用给定的Action0。

所以基本上你需要做:

Subscription subscription = Subscriptions.create(new Action0() {
    @Override
    public void call() {
        actionsSyste.unregisterAction(...);
    }
});    
subscriber.add(subscription);
observable.subscribe(subscriber);

相关内容

  • 没有找到相关文章

最新更新