我是rxjava上的新用户,我想每2秒执行一次轮询任务50次,如果在任务中满足某些条件,它也可能终止,我试图使用Observable.interval
,但我发现除了抛出异常之外没有办法终止它,是否有任何其他操作符来满足我的目标?顺便说一句,这个功能作为API来提供可观察对象,所以我不能通过unscribe来控制订阅者和终止。
Observable.interval(timeout, interval, TimeUnit.SECONDS)
.flatmap(task - > task)
我猜Observable.takeUntil(stopPredicate)
或Observable.takeWhile(predicate)
可以帮助你:
Observable.interval(timeout, interval, TimeUnit.SECONDS)
.takeWhile(val -> val < 42)
这里的observable将在第42次尝试时终止
可以使用takeUntil
停止Observable.interval
,如下所示:
Observable.interval(0, 1, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.takeUntil(new Predicate<Long>() {
@Override
public boolean test(Long aLong) throws Exception {
return aLong == 10;
}
})
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG, "Tick: " + aLong);
}
});
在这个例子中,observable会在10秒后停止运行。
下面的代码将产生以下输出:
Starting interval..
Next tick is: 1
Next tick is: 2
Next tick is: 3
Next tick is: 4
Next tick is: 5
Interval complete
如果取消这一行的注释:"//If (t==3) dispose();"它将只打印以下内容:
Starting interval..
Next tick is: 1
Next tick is: 2
Next tick is: 3
代码:
Observable.interval(1, TimeUnit.SECONDS).take(5).subscribeWith(new DisposableObserver<Long>() {
@Override public void onStart() {
// do whatever might be needed here to run once before
// the interval begins
System.out.println("Starting interval..");
}
@Override public void onNext(Long t) {
System.out.println("Next tick is: "+t);
// if( t==3 ) dispose();
}
@Override public void onError(Throwable t) {
t.printStackTrace();
}
@Override public void onComplete() {
// will be called once when all 5 ticks are completed
// will NOT be called if dispose() has been called
System.out.println("Interval complete");
}
});