假设我有以下任务链要定期执行。
Observable.interval(0, 10, TimeUnit.SECONDS)
.flatMapIterable(l -> provider.getAssets())
.map(renderer::render)
.map(converter::convert)
.subscribe(provider::publish)
尽管每个任务可能需要 10 秒以上才能完成。我想每 10 秒调用一次getAssets()
,处理,但不想赶上自处理开始以来发生的排放。如何省略它们?谢谢。
我认为您可以通过添加"进行中的标志"来解决此问题。在您的情况下,它看起来像这样:
AtomicBoolean inProgress = new AtomicBoolean(false);
Observable.interval(0, 10, TimeUnit.SECONDS)
.filter(emission -> !inProgress.get())
.flatMapIterable(l -> provider.getAssets())
.map(renderer -> {
// set to true when processing started
inProgress.set(true);
return renderer.render();
})
.map(converter::convert)
.subscribe(provider -> {
// set to false when processing finished
provider.publish();
inProgress.set(false);
});
事实证明,我追求的是背压运算符,特别是onBackpressureDrop
省略最新值。
Flowable.interval(1, TimeUnit.MINUTES)
.onBackpressureDrop()
.observeOn(Schedulers.io())
.doOnNext(e -> networkCall.doStuff())
.subscribe(v -> { }, Throwable::printStackTrace);