如何从observable中停止interval



我是rxjava上的新用户,我想每2秒执行一次轮询任务50次,如果在任务中满足某些条件,它也可能终止,我试图使用Observable.interval,但我发现除了抛出异常之外没有办法终止它,是否有任何其他操作符来满足我的目标?顺便说一句,这个功能作为API来提供可观察对象,所以我不能通过unscribe来控制订阅者和终止。

Observable.interval(timeout, interval, TimeUnit.SECONDS)
.flatmap(task - > task)

我猜Observable.takeUntil(stopPredicate)Observable.takeWhile(predicate)可以帮助你:

Observable.interval(timeout, interval, TimeUnit.SECONDS) 
.takeWhile(val -> val < 42)

这里的observable将在第42次尝试时终止

可以使用takeUntil停止Observable.interval,如下所示:

Observable.interval(0, 1, TimeUnit.SECONDS)
            .observeOn(AndroidSchedulers.mainThread())
            .takeUntil(new Predicate<Long>() {
                @Override
                public boolean test(Long aLong) throws Exception {
                    return aLong == 10;
                }
            })
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    Log.d(TAG, "Tick: " + aLong);
                }
            });

在这个例子中,observable会在10秒后停止运行。

下面的代码将产生以下输出:

Starting interval..
Next tick is: 1
Next tick is: 2
Next tick is: 3
Next tick is: 4
Next tick is: 5
Interval complete

如果取消这一行的注释:"//If (t==3) dispose();"它将只打印以下内容:

Starting interval..
Next tick is: 1
Next tick is: 2
Next tick is: 3

代码:

         Observable.interval(1, TimeUnit.SECONDS).take(5).subscribeWith(new DisposableObserver<Long>() {
        @Override public void onStart() {                
            // do whatever might be needed here to run once before
            // the interval begins
            System.out.println("Starting interval..");
        }
        @Override public void onNext(Long t) {
            System.out.println("Next tick is: "+t);
            // if( t==3 ) dispose();
        }
        @Override public void onError(Throwable t) {
            t.printStackTrace();
        }
        @Override public void onComplete() {
            // will be called once when all 5 ticks are completed
            // will NOT be called if dispose() has been called
               System.out.println("Interval complete");
        }
    });

相关内容

  • 没有找到相关文章

最新更新