如何在错误情况下组合多个 RxJava 链非阻塞



我的要求:

  • N 并行改造呼叫
  • 等待所有调用完成(成功或失败)
  • 如果 k (0<= k
  • 可选:如果 RxJava 有一个简单的方法来区分哪个调用是每个成功或失败的哪个,那就太好了,如果没有,我将解析响应并自己弄清楚

我有什么:

Observable<ResponseBody> api1Call = api1.fetchData();
Observable<ResponseBody> api2Call = api2.fetchData();
Observable<ResponseBody> api3Call = api3.fetchData();
Observable.combineLatest(api1Call, api2Call, api3Call, new Func2<ResponseBody, ResponseBody, ResponseBody, Object>() {
@Override
public Object call(ResponseBody responseBody1, ResponseBody responseBody2, ResponseBody responseBody3) {
Logger.i("what does this do? - '%s', '%s', '%s'", responseBody1, responseBody2, responseBody3);
return null;
}
}).onErrorResumeNext(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
Logger.e(throwable, "some error with one of the apis?");
return Observable.empty();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Object>() {
@Override
public void onCompleted() {
Logger.i("onCompleted");
}
@Override
public void onError(Throwable e) {
Logger.e(e, "onError");
}
@Override
public void onNext(Object o) {
Logger.i("onNext " + o);
}
});

我得到的输出:

some error with one of the apis?
// stacktrace of the error
onCompleted

我是RxJava的新手,非常困惑。我在StackOverflow上找到了一些答案,说zip做类似的事情,但它离我的要求更远。我猜其中一个"组合"运算符 + 适当的异常处理将满足我的需要。到目前为止,真的很难弄清楚

我正在使用的版本:

compile 'io.reactivex:rxjava:1.3.0'
compile 'io.reactivex:rxandroid:1.2.1'
compile 'com.squareup.retrofit2:adapter-rxjava:2.3.0'
  1. 你不能通过combineLastzip实现并行,rxjava在我的测试中会按顺序执行和发出你的项目。

  2. 如果其中一个任务失败,则不会调用您的Func2#call,而是提交onError。您甚至无法以这种方式获得其他成功任务的结果。

  3. 解决方案是flatMap,这是在null0中实现并发的传统方式。它还满足您的其他要求。

下面是一个小但完整的示例。

我使用一个简单的网站服务来测试。

我使用Semaphore等待所有任务完成,您可以完全忽略它。并且我将日志记录添加到http请求中以更好地理解,您也可以完全忽略它。

public interface WebsiteService {
@GET
Observable<ResponseBody> website(@Url String url);
}

然后我使用以下内容来测试结果rxjava.

HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor();
loggingInterceptor.setLevel(HttpLoggingInterceptor.Level.BASIC);
Retrofit retrofit = new Retrofit.Builder().baseUrl("https://www.google.com")
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.client(new OkHttpClient.Builder().addInterceptor(loggingInterceptor).build())
.build();
WebsiteService websiteService = retrofit.create(WebsiteService.class);
final Semaphore s = new Semaphore(1);
try {
s.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
Observable<ResponseBody> first = websiteService.website("http://github.com");
Observable<ResponseBody> second = websiteService.website("http://stackoverflow.com");
Observable<ResponseBody> third = websiteService.website("http://notexisting.com");
final int numberOfCalls = 3; // testing for three calls
Observable.just(first, second, third)
.flatMap(new Function<Observable<ResponseBody>, ObservableSource<ResponseBody>>() {
@Override
public ObservableSource<ResponseBody> apply(@NonNull Observable<ResponseBody> responseBodyObservable) throws Exception {
return responseBodyObservable.subscribeOn(Schedulers.computation());
}
})
.subscribeOn(Schedulers.computation())
.subscribe(new Observer<ResponseBody>() {
private int currentDoneCalls = 0;
private void checkShouldReleaseSemaphore() {
if (currentDoneCalls >= numberOfCalls) {
s.release();
}
}
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull ResponseBody responseBody) {
System.out.println("Retrofit call success " + responseBody.contentType());
synchronized (this) {
currentDoneCalls++;
}
checkShouldReleaseSemaphore();
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println("Retrofit call failed " + e.getMessage());
synchronized (this) {
currentDoneCalls++;
}
checkShouldReleaseSemaphore();
}
@Override
public void onComplete() {
System.out.println("onComplete, All request success");
checkShouldReleaseSemaphore();
}
});
try {
s.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("All request done");
s.release();
}

我使用rxjava2和改造adapter-rxjava2进行测试。

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.0'
compile 'com.squareup.retrofit2:retrofit:2.3.0'
compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'
compile 'com.squareup.okhttp3:logging-interceptor:3.8.1'

更新

来自github的RxJava2的介绍页面指出了实现paralellism的实用方法。

实际上,RxJava 中的平行主义意味着运行独立的流并将其结果合并回单个流。操作员flatMap执行此操作...

虽然这个例子是基于RxJava2,但运算flatMap是 已经存在于RxJava.

我认为在您的用例 Zip 运算符中它更合适

在这里,您可以看到在主线程中运行,但是如果您使用observerOn,也可以使其在另一个线程中运行它们中的每一个

/**
* Since every observable into the zip is created to subscribeOn a different thread, it´s means all of them will run in parallel.
* By default Rx is not async, only if you explicitly use subscribeOn.
*/
@Test
public void testAsyncZip() {
scheduler = Schedulers.newThread();
scheduler1 = Schedulers.newThread();
scheduler2 = Schedulers.newThread();
long start = System.currentTimeMillis();
Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2)
                  .concat(s3))
.subscribe(result -> showResult("Async in:", start, result));
}

private Observable<String> obAsyncString() {
return Observable.just("")
.observeOn(scheduler)
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> "Hello");
}
private Observable<String> obAsyncString1() {
return Observable.just("")
.observeOn(scheduler1)
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> " World");
}
private Observable<String> obAsyncString2() {
return Observable.just("")
.observeOn(scheduler2)
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> "!");
}

您可以在此处查看更多示例 https://github.com/politrons/reactive

您可以使用Observable.mergeDelayError(api1Call, api2Call, api3Call).

奖励:您还可以指定可以同时运行的最大并行调用数。例如:

ResponseBody0 .

对于 Retrofit
  1. 中的多个并行调用,我们需要在初始化 Retrofit 时从 OkHttp 层进行设置。 请检查此内容。
  2. 如果在这种情况下使用combineLatestzip运算符(对于改造呼叫),则每个调用仅发出一个项目。因此,两个运营商都将等待所有呼叫完成。所以,我们不需要担心这一点。有关更多信息,请查看 combineLatest 和 zip。
  3. 如果您的意思是1 call fail关于 RxJava 流错误,则会抛出此错误,不会发出任何组合项。但是1 call failhttp request fail,当 3 个调用完成时,流总是发出一个项目。我们不能在这里使用combineLastzip运算符。

感谢@TinTran和这个,这是正确的解决方案:
(我现在无法为改造可观察量提供确切的语法,但这应该没关系,逻辑保持不变 改造与否)

Observable.mergeDelayError(getData1(), getData2()).doAfterTerminate(new Action0() {
@Override
public void call() {
Logger.i("end of all streams");
tvTheText.setText("all streams finished");
}
}).subscribe(new PrintSubscriber<>("merge" +
" delay w error"));

可观察量(改造的应以相同的方式工作):

private Observable<String> getData1() {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> singleSubscriber) {
try {
long responseTime = 120 + new Random().nextInt(30);
Thread.sleep(responseTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
singleSubscriber.onNext("data 1");
singleSubscriber.onCompleted();
}
}).observeOn(AndroidSchedulers.mainThread()).subscribeOn(Schedulers.io());
}
private Observable<String> getData2() {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> singleSubscriber) {
try {
long responseTime = 100 + new Random().nextInt(19);
Thread.sleep(responseTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
singleSubscriber.onError(new Exception());// this one never blocks the other Observables' streams
}
}).observeOn(AndroidSchedulers.mainThread()).subscribeOn(Schedulers.io());
}

输出日志:

10-24 15:27:23.335 D: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
10-24 15:27:23.335 D: │ Thread: main
10-24 15:27:23.335 D: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
10-24 15:27:23.335 D: │ SafeSubscriber.onNext  (SafeSubscriber.java:134)
10-24 15:27:23.335 D: │    PrintSubscriber.onNext  (PrintSubscriber.java:32)
10-24 15:27:23.335 D: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
10-24 15:27:23.336 D: │ merge delay w error - onNext - data 1
10-24 15:27:23.336 D: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
10-24 15:27:23.342 V: ⇢ onError(e=java.lang.Exception)
10-24 15:27:23.342 V: ⇠ onError [0ms]
10-24 15:27:23.343 I: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
10-24 15:27:23.343 I: │ Thread: main
10-24 15:27:23.343 I: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
10-24 15:27:23.343 I: │ OperatorDoAfterTerminate$1.callAction  (OperatorDoAfterTerminate.java:73)
10-24 15:27:23.343 I: │    MainActivity$1.call  (MainActivity.java:37)
10-24 15:27:23.343 I: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
10-24 15:27:23.344 I: │ end of all streams
10-24 15:27:23.344 I: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────

最新更新