Using RxJava and Okhttp



我想在另一个线程中使用okhttp请求url(如IO线程)并在Android主线程中获得Response,但我不知道如何创建Observable

Observable.defer()代替Observable.create()更容易、更安全:

final OkHttpClient client = new OkHttpClient();
Observable.defer(new Func0<Observable<Response>>() {
    @Override public Observable<Response> call() {
        try {
            Response response = client.newCall(new Request.Builder().url("your url").build()).execute();
            return Observable.just(response);
        } catch (IOException e) {
            return Observable.error(e);
        }
    }
});

这样就可以为您处理退订和反压。这是Dan Lew关于create()defer()的一篇很棒的文章。

如果你想走Observable.create()路线,那么它应该看起来更像这个库中到处都是isUnsubscribed()调用。我相信这仍然不能处理反压

我知道这篇文章有点老了,但是现在有一种新的更方便的方法来做到这一点

Observable.fromCallable {
        client.newCall(Request.Builder().url("your url").build()).execute()
    }

更多信息:https://artemzin.com/blog/rxjava-defer-execution-of-function-via-fromcallable/

我来晚了,但是,如果由于某种原因代码需要流化响应体,那么deferfromCallable不会这样做。可以使用using操作符。

Single.using(() -> okHttpClient.newCall(okRequest).execute(), // 1
             response -> { // 2
                 ...
                 return Single.just((Consumer<OutputStream>) fileOutput -> {
                     try (InputStream upstreamResponseStream = response.body().byteStream();
                          OutputStream fileOutput = responseBodyOutput) {
                         ByteStreams.copy(upstreamResponseStream, output);
                     }
                 });
             },
             Response::close, // 3
             false) // 4
      .subscribeOn(Schedulers.io()) // 5
      .subscribe(copier -> copier.accept(...), // 6
                 throwable -> ...); // 7
  1. 第一个lambda在订阅后执行响应
  2. 第二个lambda创建了可观察对象类型,这里是Single.just(...)
  3. 第三个lambda处理响应。对于defer,可以使用try-with-resources样式。
  4. 设置eager开关为false,使处理器在终端事件后被调用,即在订阅消费者被执行之后。
  5. 当然让事情发生在另一个线程池
  6. 这是将使用响应体的lambda。如果没有将eager设置为false,代码将引发IOException,原因为'closed',因为响应在进入此lambda之前已经关闭。
  7. onError lambda应该处理异常,特别是不能再被using操作符捕获的IOException,因为可以使用defer的try/catch。

Okhttp3 with RxSingle后台API调用。

     Disposable disposables = Single.fromCallable(() -> {
        Log.e(TAG, "clearData: Thread[" + Thread.currentThread().getName() + "]");
        OkHttpClient client = Util.getHttpClient();
        Request request = new Request.Builder()
                .addHeader("Authorization", "Bearer " + Util.getUserToken())
                .url(BuildConfig.BASE_URL + ApiConstants.DELETE_FEEDS)
                .build();
        Response response = client.newCall(request).execute();
        if(response.isSuccessful()) {
           ...
           return ; // Any  type
        } else {
           return ; // Any type        
        }
    }).subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe((result) -> {
           Log.d(TAG, "api() completed");
      });

    compositeDisposable.add(disposables);

相关内容

  • 没有找到相关文章

最新更新