等待一个Observable完成后再执行另一个Observable



问题

我有一个活动,它定期从API获取数据并显示接收到的数据。API使用OAuth,所以我收到一个临时访问令牌,该令牌在一段时间(1小时)后到期。如果应用程序试图使用过期的令牌获取数据,显然请求将失败。在我的应用程序的早期迭代中,我使用AsyncTasks进行网络请求,基本上只是执行一个新的AsyncTask,在调用从服务器获取数据的主AsyncTask之前获得一个新的访问令牌。这工作得很好,因为主AsyncTask会等到另一个AsyncTask完成后才执行。

我最近切换到RxJava,基本上只是用Observables替换了AsyncTasks。问题是,获取数据的主Observable不会等待刷新访问令牌的Observable完成。这是我的代码,谢谢你的帮助。

LiveThreadActivity.java

private Subscription subscription;
private Observable<List<CustomComment>> fetchData;
@Override
protected void onResume() {
    super.onResume();
    if (tokenExpired()) {
        auth.refreshToken();
    }
    subscription = fetchData
            .compose(bindToLifecycle())
            .retryWhen(new RetryWithDelay(5, 2000))
            .subscribe(list -> addNewComments(list), e -> handleFetchDataError(e));
}

// This method gets called in onCreate()
private void dataCollection() {
    fetchData = Observable.interval(0, REFRESH_RATE, TimeUnit.MILLISECONDS)
            .map(tick -> fetchNewComments())            // Run function every time a tick is emitted
            .retryWhen( new RetryWithDelay(2, 2000) )   // Retry twice with 2 second delay
            .subscribeOn(Schedulers.io())               // Network stuff in background thread
            .observeOn(AndroidSchedulers.mainThread()); // Other stuff on the main thread
}

Auth.java

public class Auth {
    ...
    public void refreshToken() {
        Observable.just(1)
                .map(y -> refreshAccessToken())
                .retryWhen( new RetryWithDelay(3, 2000) )
                .subscribeOn(Schedulers.io())
                .subscribe();
    }
}

使用响应式库需要一种新的思维方式。您必须将代码编写为同步的,但要注意它是异步执行的。

你的代码只是同步执行。它同时执行两个Observable

函数refreshToken()应该是这样的:

public Observable<?> refreshToken() {
    return Observable.just(1)
            .map(y -> refreshAccessToken())
            .retryWhen( new RetryWithDelay(3, 2000) )
            .subscribeOn(Schedulers.io());
}

onResume():

@Override
protected void onResume() {
    super.onResume();
    Observable obs = fetchData
            .compose(bindToLifecycle())
            .retryWhen(new RetryWithDelay(5, 2000));
    if (tokenExpired()) {
        obs = obs.startWith(auth.refreshToken());
    }
    subscription = obs
            .subscribe(list -> addNewComments(list), e -> handleFetchDataError(e));
}

注意startWith()操作符。它允许一个接一个地执行Observable(获取列表)(刷新令牌)。

. flatmap()可能就足够了,即tokenObservable。flatMap(/* return dataObservable */)

相关内容

  • 没有找到相关文章

最新更新