RxJava 2 - 处理时省略间隔排放



假设我有以下任务链要定期执行。

Observable.interval(0, 10, TimeUnit.SECONDS)
.flatMapIterable(l -> provider.getAssets())
.map(renderer::render)
.map(converter::convert)
.subscribe(provider::publish)

尽管每个任务可能需要 10 秒以上才能完成。我想每 10 秒调用一次getAssets(),处理,但不想赶上自处理开始以来发生的排放。如何省略它们?谢谢。

我认为您可以通过添加"进行中的标志"来解决此问题。在您的情况下,它看起来像这样:

AtomicBoolean inProgress = new AtomicBoolean(false);
Observable.interval(0, 10, TimeUnit.SECONDS)
.filter(emission -> !inProgress.get())
.flatMapIterable(l -> provider.getAssets())
.map(renderer -> {
// set to true when processing started
inProgress.set(true);
return renderer.render();
})
.map(converter::convert)
.subscribe(provider -> { 
// set to false when processing finished
provider.publish();
inProgress.set(false);
});

事实证明,我追求的是背压运算符,特别是onBackpressureDrop省略最新值。

Flowable.interval(1, TimeUnit.MINUTES)
.onBackpressureDrop()
.observeOn(Schedulers.io())
.doOnNext(e -> networkCall.doStuff())
.subscribe(v -> { }, Throwable::printStackTrace);

相关内容

  • 没有找到相关文章

最新更新