为什么可观察量不在正确的线程上创建?


Observable observable = Observable.from(backToArray(downloadWebPage("URL")))
.map(new Func1<String[], Pair<String[], String[]>>() {
@Override
public Pair<String[], String[]> call(String[] of) {
return new Pair<>(of,
backToArray(downloadWebPage("URL" + of[0])).get(0));
}
});
observable.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(
(new Observer<Pair>() {
@Override
public void onCompleted() {
// Update user interface if needed
}
@Override
public void onError(Throwable t) {
// Update user interface to handle error
}
@Override
public void onNext(Pair p) {
offices.add(new Office((String[]) p.first, (String[]) p.second));
}
}));

它运行,我得到android.os.NetworkOnMainThreadException。我希望它运行由 subscribeOn() 方法设置的新线程。

假设实际的网络请求发生在downloadWebPage()中,错误位于代码的第一行:

Observable observable = Observable.from(backToArray(downloadWebPage("http://api.ataxcloudapp.com/v1/franchise/listing/?location=" + ZIPCode)))

这相当于:

String[] response = downloadWebPage("http://api.ataxcloudapp.com/v1/franchise/listing/?location=" + ZIPCode)
Observable observable = Observable.from(backToArray(response))

这应该清楚地表明,downloadWebPage是在创建任何Observable之前在主线程上执行的,更不用说订阅了。在这方面,RxJava无法改变Java的语义。

但是,您可以做的是这样的(未经测试,但应该是正确的):

Observable observable = Observable.create(new Observable.OnSubscribe<String[]>() {
@Override
public void call(final Subscriber<? super String[]> subscriber) {
final String[] response = downloadWebPage("http://api.ataxcloudapp.com/v1/franchise/listing/?location=" + ZIPCode);
if (! subscriber.isUnsubscribed()) {
subscriber.onNext(backToArray(response));
subscriber.onCompleted();
}
}
)

现在,您的网络请求将仅在订阅Observable发生,并将移动到您在subscribeOn()中指定的线程。

您可以使用 defer() 将downloadWebPage的调用推迟到订阅可观察量的那一刻。

例:

private Object slowBlockingMethod() { ... }
public Observable<Object> newMethod() {
return Observable.defer(() -> Observable.just(slowBlockingMethod()));
}

你应该从

**observable.subscribeOn(Schedulers.newThread())**

**observable.subscribeOn(Schedulers.io())**

相关内容

  • 没有找到相关文章