我正在尝试使用背压来处理数据分页。我有一个可观察到的可观察到的,向每个请求(n)请求服务器的新很多数据。用户单击按钮后,我希望获取新的数据。
我的主要目标是通过可观察的合同替换此API
interface Api {
void openConnection(); //subscription
boolean hasNextPage(); //completion
Single<Data> nextPage();//onNext and onError
void closeConnection(); //unsubscription
}
我认为拉链在这里会很棒
Observable.zip(dataFromServer(), userActions(), (data, ø) -> data).subscribe(showData());
但是,由于rxringbuffer的zip运算符内部的M大小,订阅立即请求服务器中的数据块后,上层结构。这是完全不合适的。即使我可以将缓冲尺寸更改为1(但我不能)上层结构的行为是不合适的。将在第一个用户操作之前请求数据,但我希望在第一个用户操作之后首先要求它。看来我的方法是完全错误的。
我已经设法完成了适当的行为。
//model of user actions
val userActions = Observable.interval(1000, TimeUnit.MILLISECONDS).share();//user clicks every second//
//model of my observable with data which supports backpressure
val dataFromServer = Observable.range(1, 10);//data has 10 pages//
dataFromServer
.doOnNext(it -> System.out.println("Received from server" + it))
.flatMap(it -> userActions.take(1).ignoreElements().startWith(it), 1)
.doOnNext(it -> System.out.println("Before data showing" + it))
.subscribe(showData());