我正在使用以下可观察值来执行常规任务。Observable 在类首次加载到内存中时启动,然后定期执行我的代码。由于它是单个可观察的,因此可以保证[从我的测试来看,这是一个假设]代码永远不会第二次启动并并行处理,以防其运行时间超过间隔。
private static Subscription subscription = Observable.timer(0, 1000, TimeUnit.MILLISECONDS)
.flatMap(new Func1<Long, Observable<String>>() {
@Override public Observable<String> call(Long aLong) {
// some code
return Observable.just(null);
}
}).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread())
.subscribe();
但这也有一个缺点,即 rxjava 会累积延迟的排放,并在延迟迭代完成后快速启动它们。示例:如果计时器被编程为每 1000 毫秒迭代一次,迭代 n 需要 5000 毫秒,则迭代 n+1、n+2、n+3 等将按顺序启动,但一个接一个,并且不遵守计时器间隔。
还不错,但真正是一个问题是当Android睡眠几个小时时会发生什么。因为一旦设备唤醒,rxjava 就会以快速顺序启动所有错过的迭代,这给性能带来了相当大的影响。
如何告诉 rxjava 忘记错过的迭代?如果迭代需要更长的时间,我希望计时器在该迭代完成后启动,或者我想删除错过的迭代并在到期时开始下一次迭代。我尝试使用 sample(( 和其他过滤器,但它不知何故没有给我想要的效果,或者我不知道如何正确应用它们。
请注意,我不想为每次迭代创建一个新的 Observable(我可以为此使用 zip(,因为我想确保代码不是从多个线程执行的。
当您的Android设备处于睡眠模式时,不要要求Rx跳过某些事件。但是停止Rx!
订阅流时,您有一个处理程序来取消订阅此流。
Subscription subscription = Observable.timer(1, SECONDS).subscribe();
在活动的OnPause()
方法中,可以通过调用订阅上的unsubscribe()
方法来停止流。
@Override
public void onPause() {
subscription.unsubscribe();
}
在活动onResume()
方法中,您可以在直播中再次订阅
@Override
public void onResume() {
subscription = Observable.timer(1, SECONDS).subscribe();
}