目前我正在尝试使用RxJava实现服务器轮询,我已经对如何在收到服务器响应后重复整个链做了一些研究,我尝试过使用repeat((,它可以工作,但没有那么完美,原因是它多次调用api,服务器在发送到客户端之前需要额外的时间来处理数据,但是我们不知道确切的时间,所以我们不能用repeatWhen((来给出具体的时间。我唯一能使用的就是等待api响应。
任何建议都将不胜感激!
以下是代码片段:
retrofitService.requestPolling()
.repeat() // do not wait to call server so many times
.takeUntil(new Func1<PollResponse, Boolean>() {
@Override
public Boolean call(PollResponse pollResponse) {
return pollResponse.mComplete;
}
})
.doOnNext(new Action1<FlightSearchPollResponse>() {
@Override
public void call(pollResponse pollResponse) {
// update UI here
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<PollResponse>() {
@Override
public void onCompleted() {
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(PollResponse pollResponse) {
} );
编辑:我是RxJava的新手,刚刚获得了这个名为BackPressure的主题,有很多文章解释了如何处理它,因为我不想缓存这个响应,似乎Subject是一个很好的选择,它可以让你控制何时拉。
http://akarnokd.blogspot.com/2015/06/subjects-part-1.html
感谢@Gary LO
应该有很多方法。我想分享其中一个。
- 创建单独的信号流
PublishSubject pollingSignal
- 将信号转换为api调用
-
发布信号以再次执行此操作。
final PublishSubject<Boolean> pollingSignal = PublishSubject.create(); final Observable<PollResponse> apiResponse = retrofitService.requestPolling(); pollingSignal .flatMap(x -> apiResponse) .subscribe(new Observer<PollResponse>() { @Override public void onCompleted() {} @Override public void onError(Throwable throwable) {} @Override public void onNext(PollResponse integer) { // start the next polling pollingSignal.onNext(true); } }); // start the first polling pollingSignal.onNext(true);
玩得开心!
备注使用PublishSubject<Boolean>
而不是PublishSubject<Void>
是因为我觉得使用pollingSignal.onNext(null)
不舒服。
但在Kotlin,我可以使用PublishSubject<Unit>
和pollingSignal.onNext(Unit)