我想在另一个线程中使用okhttp请求url(如IO线程)并在Android主线程中获得Response
,但我不知道如何创建Observable
。
用Observable.defer()
代替Observable.create()
更容易、更安全:
final OkHttpClient client = new OkHttpClient();
Observable.defer(new Func0<Observable<Response>>() {
@Override public Observable<Response> call() {
try {
Response response = client.newCall(new Request.Builder().url("your url").build()).execute();
return Observable.just(response);
} catch (IOException e) {
return Observable.error(e);
}
}
});
这样就可以为您处理退订和反压。这是Dan Lew关于create()
和defer()
的一篇很棒的文章。
如果你想走Observable.create()
路线,那么它应该看起来更像这个库中到处都是isUnsubscribed()
调用。我相信这仍然不能处理反压
我知道这篇文章有点老了,但是现在有一种新的更方便的方法来做到这一点
Observable.fromCallable {
client.newCall(Request.Builder().url("your url").build()).execute()
}
更多信息:https://artemzin.com/blog/rxjava-defer-execution-of-function-via-fromcallable/
我来晚了,但是,如果由于某种原因代码需要流化响应体,那么defer
或fromCallable
不会这样做。可以使用using
操作符。
Single.using(() -> okHttpClient.newCall(okRequest).execute(), // 1
response -> { // 2
...
return Single.just((Consumer<OutputStream>) fileOutput -> {
try (InputStream upstreamResponseStream = response.body().byteStream();
OutputStream fileOutput = responseBodyOutput) {
ByteStreams.copy(upstreamResponseStream, output);
}
});
},
Response::close, // 3
false) // 4
.subscribeOn(Schedulers.io()) // 5
.subscribe(copier -> copier.accept(...), // 6
throwable -> ...); // 7
- 第一个lambda在订阅后执行响应。 第二个lambda创建了可观察对象类型,这里是
- 第三个lambda处理响应。对于
defer
,可以使用try-with-resources样式。 - 设置
eager
开关为false
,使处理器在终端事件后被调用,即在订阅消费者被执行之后。 - 当然让事情发生在另一个线程池
- 这是将使用响应体的lambda。如果没有将
eager
设置为false
,代码将引发IOException,原因为'closed',因为响应在进入此lambda之前已经关闭。 -
onError
lambda应该处理异常,特别是不能再被using
操作符捕获的IOException
,因为可以使用defer
的try/catch。
Single.just(...)
Okhttp3 with RxSingle后台API调用。
Disposable disposables = Single.fromCallable(() -> {
Log.e(TAG, "clearData: Thread[" + Thread.currentThread().getName() + "]");
OkHttpClient client = Util.getHttpClient();
Request request = new Request.Builder()
.addHeader("Authorization", "Bearer " + Util.getUserToken())
.url(BuildConfig.BASE_URL + ApiConstants.DELETE_FEEDS)
.build();
Response response = client.newCall(request).execute();
if(response.isSuccessful()) {
...
return ; // Any type
} else {
return ; // Any type
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe((result) -> {
Log.d(TAG, "api() completed");
});
compositeDisposable.add(disposables);