Problems with RXJava



我正在改编what3words的一些示例代码,以便通过Java SDK访问他们的API。它使用RXJava。

示例代码为:

Observable.fromCallable(() -> wrapper.convertTo3wa(new Coordinates(51.2423, -0.12423)).execute())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(result -> {
if (result.isSuccessful()) {
Log.i("MainActivity", String.format("3 word address: %s", result.getWords()));
} else {
Log.e("MainActivity", result.getError().getMessage());
}
});

首先。这将在构建时给出一个弃用警告和一个IDE警告(Result of 'Observable.subscribe()' is ignored)。

为了解决第一个问题,我在Observable前面添加了Disposable myDisposable =。这是正确的吗?

接下来,我需要添加一个超时,以便在请求超时时显示警告等。为此,我将.timeout(5000, TimeUnit.MILLISECONDS)添加到构建器中。

这是有效的,但是timeouts似乎在Observables上工作的方式是,它们抛出一个异常,我不知道如何捕获和处理该异常。

我现在拥有的是:

Disposable myDisposable = Observable.fromCallable(() -> wrapper.convertTo3wa(new Coordinates(51.2423, -0.12423)).execute())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.timeout(5000, TimeUnit.MILLISECONDS)
.subscribe(result -> {
if (result.isSuccessful()) {
Log.i("MainActivity", String.format("3 word address: %s", result.getWords()));
} else {
Log.e("MainActivity", result.getError().getMessage());
}
});

这构建和运行良好,并且API/deprecation警告不显示,但是当没有网络可用时,正确地超时并抛出未处理的异常。

所以,代码似乎是正确的,但究竟如何添加异常处理来捕获超时TimeoutException抛出?

我已经尝试了很多事情,包括:在整个Observable周围添加try-catch子句-这警告TimeoutException不会被' try '中的代码抛出;并添加错误处理程序。

添加错误处理程序让我最接近,所以下面的代码是我所能得到的:

Disposable myDisposable = Observable.fromCallable(() -> wrapper.convertTo3wa(new Coordinates(51.2423, -0.12423)).execute())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.timeout(5000, TimeUnit.MILLISECONDS)
.subscribe(result -> {
if (result.isSuccessful()) {
Log.i("MainActivity", String.format("3 word address: %s", result.getWords()));
} else {
Log.e("MainActivity", result.getError().getMessage());
}
}, error -> {
runOnUiThread(new Runnable() {
@Override
public void run() {
myTextView.setText(R.string.network_not_available);
}
});
});

这捕获Timeout正确并更新我的UI没有错误,然而,当网络恢复时,似乎可观察对象可能试图返回和抛出空指针异常。

(更新,无论网络是否恢复,这个NPE有时可能会在很短的时间后抛出…但它总是在网络恢复时抛出。)

我得到FATAL EXCEPTION: RxCachedThreadScheduler-1java.lang.NullPointerException: Callable returned a null value. Null values are generally not allowed in 3.x operators and sources.

我需要销毁Observable或其他东西来防止NPE吗?

您需要为您的subscribe调用添加一个onError处理程序:

.subscribe(result -> {
if (result.isSuccessful()) {
Log.i("MainActivity", String.format("3 word address: %s", result.getWords()));
} else {
Log.e("MainActivity", result.getError().getMessage());
}
},
error -> {
// handle error here
});

当一个异常导致订阅调用没有onError处理程序时,它将抛出一个OnErrorNotImplementedException,如下所示:

io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | java.util.concurrent.TimeoutException: The source did not signal an event for 1 seconds and has been terminated.

添加onError处理程序将防止这种情况,而onError处理程序将被调用。

这里有几件事:

首先。这将在构建时给出一个弃用警告和一个IDE警告(忽略'Observable.subscribe()'的结果)。

subscribe()返回一个Disposable。其思想是,当您不再对接收可观察对象的输出感兴趣时,您可以调用一次性对象上的dispose(),然后工作终止。这也可以防止内存泄漏。

作为一个例子,假设你有一个Activity,你启动了一个Observable来运行一个很长的网络查询,最终向Activity UI发布一些内容。如果用户在此任务完成之前就离开了,或者Activity被销毁了,那么您就不再对其输出感兴趣,因为不再有可以发布的UI。所以你可以在onStop()中调用dispose()

所以,代码似乎是正确的,但是究竟如何添加异常处理来捕获抛出的超时TimeoutException ?

subscribe中使用error块是一种选择,但还有其他选择。例如,如果您想继续使用Result类,则可以使用类似onErrorReturn(throwable -> Result.error(throwable))的类。显然,我猜这个类是什么样子的:

.timeout(5000, TimeUnit.MILLISECONDS)
.onErrorReturn(throwable -> Result.errorWithMessage(R.string.network_not_available))
.subscribe(result -> {
if (result.isSuccessful()) {
Log.i("MainActivity", String.format("3 word address: %s", result.getWords()));
} else {
myTextView.setText(result.getErrorMessage());
}
});

. lang。NullPointerException: Callable返回一个空值。3中一般不允许使用空值。X操作符和源。

:

wrapper.convertTo3wa(new Coordinates(51.2423, -0.12423)).execute()

返回null。你可以这样做:

Observable.fromCallable(() -> {
Result<?> out = wrapper.convertTo3wa(new Coordinates(51.2423, -0.12423)).execute();
if(out == null)
out = Result.error(/*Returned null*/);
}
return out;
}