RxJava:仅当第一个可观察量抛出错误并从第一个开始重复时,才执行第二个可观察量



我正在使用retorift点击getAricleapi并获取与用户相关的文章列表。 如果传递的令牌已过期,getArticleapi 将抛出错误,如果是这样,那么我必须调用refreshTokenapi 才能获得新令牌,然后我必须再次调用getArticleAPI

ApiController.createRx().getArticle(token)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ response -> toast(response.body().url) }, { e ->
println(e.printStackTrace())
if(e is HttpException && e.code() in  arrayOf(401,403)){                      
//Here I want to call refresh tolken api
toast("Auth error")
}
else
toast(R.string.something_went_wrong)
})

编辑

尽管给出的答案显示出一些方向,但这些并不是对我问题的直接回答。这就是解决它的方式,但我觉得这可以重构为更好的代码

ApiController.createRx().getArticle(Preference.getToken())
.flatMap { value ->
if (value.code() in arrayOf(403, 401)) {
ApiController.refreshToken()
ApiController.createRx().getArticle(Preference.getToken())
} else Observable.just(value)
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ response -> println("Success") }, { e ->
e.printStackTrace()
toast(R.string.something_went_wrong)
})

fun refreshToken() {
val token:String?=ApiController.createRx().refreshToken(Preferences.getRefreshToken()).blockingFirst()?.body()?.token
if (token != null) Preferences.setAuthToken(token)
}

编辑

我将代码重构为更干净的版本

Observable.defer { ApiController.createRx().getArticle(Preferences.getToken()) }
.flatMap {
if (it.code() in arrayOf(401, 403)) {
ApiController.refreshToken()
Observable.error(Throwable())
} else Observable.just(it)
}
.retry(1)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({println("Success") }, {
it.printStackTrace()
toast(R.string.something_went_wrong)
})

fun refreshToken() {
var token: String? = null
try {
token = createRx().refreshToken(Preferences.getRefreshToken()).blockingFirst().body()!!.token
} catch (e: Exception) {
throw e
}
println("saving token")
if (token != null) Preferences.setAuthToken(token)
}

编辑

请检查我的答案以获取最终重构的代码

我已经实现了这个确切的东西。下面是该代码的略微修改版本:

private Observable<Object> refreshTokenIfNotAuthorized(Observable<? extends Throwable> errors) {
final AtomicBoolean alreadyRetried = new AtomicBoolean(false);
return errors.flatMap(error -> {
boolean isAuthorizationError = /* some logic analyzing each error*/ ;
if (isAuthorizationError && !alreadyRetried.get()) {
try {
alreadyRetried.set(true);
String newToken = federatedTokenRefresher.refreshToken()
.toBlocking()
.first();
setLogin(newToken);
return Observable.just(null);
} catch (Exception e) {
return Observable.error(error);
}
}
return Observable.error(error);
});
}

您可以像这样使用此方法:

doSomethingRequiringAuth().retryWhen(this::refreshTokenIfNotAuthorized);

您将收到哪种错误?似乎您可以使用onErrorResumeNext运算符。

这个运算符一旦收到一个可抛出的,就允许你返回一个可观察的,而不是在onError中返回可抛出的

@Test
public void observableOnErrorResumeException() {
Integer[] numbers = {0, 1, 2, 3, 4, 5};
Observable.from(numbers)
.doOnNext(number -> {
if (number > 3) {
try {
throw new IllegalArgumentException();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
})
.onErrorResumeNext(t -> Observable.just(666))
.subscribe(System.out::println);
}

您可以在此处查看更多示例 https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/errors/ObservableExceptions.java

我会给你另一个使用groupby运算符的选项

/**
* In this example we create a response code group.
*/
@Test
public void testGroupByCode() {
Observable.from(Arrays.asList(401,403, 200))
.groupBy(code -> code)
.subscribe(groupByCode -> {
switch (groupByCode.getKey()) {
case 401: {
System.out.println("refresh token");
processResponse(groupByCode);
break;
}
case 403: {
System.out.println("refresh token");
processResponse(groupByCode);
break;
}
default: {
System.out.println("Do the toast");
processResponse(groupByCode);
}
}
});
}
private void processResponse(GroupedObservable<Integer, Integer> groupByCode) {
groupByCode.asObservable().subscribe(value -> System.out.println("Response code:" + value));
}

我在阅读了更多关于RxJava的信息后解决了我的问题,这就是我实现它的方式。 首先,retrofit抛出 4xx 错误onErroronNextonSuccess这取决于我们如何定义它。 前任:

@GET("content") fun getArticle(@Header("Authorization") token: String):Single<Article>

这会将所有 4xx 错误抛给onError,而不是Single<Article>如果您将其定义为Single<Response<Article>>则来自服务器(包括 4xx(的所有响应都将转到onNextonSuccess

Single.defer { ApiController.createRx().getArticle(Preferences.getAuthToken())}
.doOnError {
if (it is HttpException && it.code() == 401)
ApiController.refreshToken()
}
.retry { attempts, error -> attempts < 3 && error is HttpException && error.code() == 401 }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({println("Success") }, {
it.printStackTrace()
toast(R.string.something_went_wrong)
})

我使用defer作为我实际Observable的包装器,因为我想在令牌刷新后重新创建重试时可观察到的文章获取,因为我希望在我的刷新令牌代码存储新获取的令牌时再次调用Preferences.getAuthToken()优先。

如果HttpException为 401 并且尝试重试次数不超过 2 次,则返回retry

最新更新