我需要用RxJava发出两个并行的请求。为此,我使用zip运算符。这是我的代码:
public Disposable getBooksAndAuthors(String id, ReuqestCallback requestCallback) {
return singleRequest(Single.zip(
getBooks(id).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()),
getAuthors(id).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()),
(book, author) -> new ZipResponseWrapper(book, author).getResponse()), requestCallback);
}
private <T extends NetworkResponse> Disposable singleRequest(Single<T> single, RequestCallback requestCallback) {
return single.doOnSubscribe(d -> requestCallback.onStartRequest())
.doOnSuccess(s -> requestCallback.onSuccess(s))
.doOnError(ErrorConsumer.consume((t) -> requestCallback.onError(t)))
.doFinally(() -> requestCallback.onFinish())
.subscribe();
}
但是我不明白如何分别接收每个请求的响应。也就是说,如果第一个请求得到答案,我需要立即显示从该请求收到的数据,而不是等待对第二个请求的响应。在第二个请求的答案到达后,显示第二个请求上收到的数据。这是必要的,因为第二个请求需要很长时间。请帮助我。
下面是如何使用每个函数的响应来处理它的示例:
val disposable = Observable.zip(
firstNetworkCall().subscribeOn(Schedulers.io()),
secondNetworkCall().subscribeOn(Schedulers.io()),
BiFunction{
firstResonse: ResponseOneType,
secondResponse: ResponseTwoType ->
combineResult(firstResponse, secondResponse) }))
.observeOn(AndroidSchedulers.mainThread())
.subscribe { it -> doSomethingWithIndividualResponse(it) }
我的建议(虽然在 Kotlin 中(:
val id = 0L
Observables.combineLatest(
getBooks(id).startWith(emptyList<Book>()).subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()),
getAuthor(id).startWith(emptyList<Author>()).subscribeOn(Schedulers.io()).observeOn(Schedulers.computation())
) { book: List<Book>, author: List<Author> ->
Pair(book, author)
}.skip(1)
.observeOn(AndroidSchedulers.mainThread())
.subscribe { (books: List<Book>, authors: List<Author>) ->
view.show(books)
view.show(authors)
}