Retrofit + RxJava Android onNext 从未调用过,但 onComplete 是



我一直在使用Retrofit进行Android应用程序的REST API调用,并且效果很好。 但是,我遇到了有关失去连接和请求失败的问题。 我的应用程序涉及以编程方式连接到 WiFi 网络,然后在连接后发出 API 调用进行初始化。 但是,有时这些连接很慢,我需要一种方法能够在这些请求失败时重新发送这些请求。 这就是我转向RxJava的地方,因为在Retrofit中不支持它。 我想这样做的是一旦连接到新的WiFi网络,就会向REST API发出ping,如果出现网络错误,请等待一秒钟,然后重试ping。 这是我的代码:

接口中的函数声明:

@Headers("Content-Type: application/json")
@GET("something")
io.reactivex.Observable<String> ping();

活动中的改造声明

Retrofit retrofit = new Retrofit.Builder()
.baseUrl(Data.cloudEndpoint)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
avfsService = retrofit.create(MyClass.class);

尝试在活动中执行 API 调用

service.ping()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
Log.d(TAG, "retryWhen");
return Observable.timer(1, TimeUnit.SECONDS);
}
})
.blockingSubscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscibe " + d.toString());
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
e.printStackTrace();
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
launch();
}
});

所以当执行这个时,这是我在 Logcat 中看到的输出

onSubscibe null
retryWhen
onComplete

所以Next从来没有被调用过,我也不太确定为什么。 我认为每次从 Obeservable 发出数据时都会调用 onNext,我认为通过创建 Observable 并订阅它,API 调用将被执行,响应将由 Observable 发出。 情况似乎并非如此,因为我现在甚至无法重新创建网络错误,并且立即调用onComplete。

我在这里使用blockingSubscribe,因为我看到其他人说在io线程上执行并在主线程上观察会有所帮助,但没有运气。 当我只使用 .subscribe() 时,我看到了相同的行为。

我想我错过了一些东西,但不明白我的 API 调用是否没有执行,或者我只是没有得到正确的响应。

任何帮助将不胜感激。

您的困惑可能来自这样一个事实,即retryWhen运算符只调用您的重试逻辑函数一次。 由于它传递了Throwables 的Observable,因此它希望您在获得从throwableObservable发出的新error时发出重试可观察量。 最简单的方法是将错误flatmap到重试逻辑中——

.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
Log.d(TAG, "retryWhen");
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
Log.d(TAG, "retrying");
return Observable.timer(1, TimeUnit.SECONDS);
}
});
}
})

如果您运行上述操作,您仍然会看到"retryWhen"被调用一次,然后在出现错误时每秒"重试"一次,直到您获得onNextonComplete。 你可能希望添加代码以在一定时间后放弃,或者至少确保正确处理订阅,使其不会永远运行。

作为旁注,这段代码(以及一般的 RxJava)在 lambda 中更具可读性。

如果你还在努力解决retryWhen看看Dan Lew的这篇文章有一个很好的解释。

试试这个:

service.ping()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
Log.d(TAG, "retryWhen");
return Observable.timer(1, TimeUnit.SECONDS);
}
})
.subscribe(new Subscriber<String>() {    
@Override
public void onNext(String s) {
Log.d(TAG, "onNext " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
e.printStackTrace();
}
@Override
public void onCompleted() {
Log.d(TAG, "onComplete");
launch();
}
});

您的问题来自您正在使用blockingSubscribe的事实。这会阻止线程,直到可观察量完成。

由于您告诉它在 android 线程上观察,并且这可能是被阻止的线程,因此除非调用onComplete,否则它永远不会有机会发送这些onNext调用,这使得它忽略任何进一步的onNext调用。

您给定的代码可能不会死锁,因为如果没有明确告知observeOn保留顺序onCompleteonError调用优先级的权利。这意味着它允许onComplete调用通过,从而解锁线程。

最新更新