人造人.RxJava 2:并行多个网络调用



我需要用RxJava发出两个并行的请求。为此,我使用zip运算符。这是我的代码:

public Disposable getBooksAndAuthors(String id, ReuqestCallback requestCallback) {
return singleRequest(Single.zip(
getBooks(id).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()),
getAuthors(id).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()),
(book, author) -> new ZipResponseWrapper(book, author).getResponse()), requestCallback);
}
private <T extends NetworkResponse> Disposable singleRequest(Single<T> single, RequestCallback requestCallback) {
return single.doOnSubscribe(d -> requestCallback.onStartRequest())
.doOnSuccess(s -> requestCallback.onSuccess(s))
.doOnError(ErrorConsumer.consume((t) -> requestCallback.onError(t)))
.doFinally(() -> requestCallback.onFinish())
.subscribe();
}

但是我不明白如何分别接收每个请求的响应。也就是说,如果第一个请求得到答案,我需要立即显示从该请求收到的数据,而不是等待对第二个请求的响应。在第二个请求的答案到达后,显示第二个请求上收到的数据。这是必要的,因为第二个请求需要很长时间。请帮助我。

下面是如何使用每个函数的响应来处理它的示例:

val disposable = Observable.zip(
firstNetworkCall().subscribeOn(Schedulers.io()),
secondNetworkCall().subscribeOn(Schedulers.io()),
BiFunction{ 
firstResonse: ResponseOneType, 
secondResponse: ResponseTwoType -> 
combineResult(firstResponse, secondResponse) }))
.observeOn(AndroidSchedulers.mainThread())
.subscribe { it -> doSomethingWithIndividualResponse(it) }

我的建议(虽然在 Kotlin 中(:

val id = 0L

Observables.combineLatest(
getBooks(id).startWith(emptyList<Book>()).subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()),
getAuthor(id).startWith(emptyList<Author>()).subscribeOn(Schedulers.io()).observeOn(Schedulers.computation())
) { book: List<Book>, author: List<Author> ->
Pair(book, author)
}.skip(1)
.observeOn(AndroidSchedulers.mainThread())
.subscribe { (books: List<Book>, authors: List<Author>) ->
view.show(books)
view.show(authors)
}

最新更新