如何"force"某种背压以避免在 rxjava 中多次执行?



我有一段代码,他的工作是更新本地缓存。此缓存更新有两个触发器:

  1. 以固定的间隔
  2. 当要求时

所以这里有一个关于我是如何做到这一点的基本例子。

forceReloadEvents = new SerializedSubject<Long, Long>(PublishSubject.<Long> create());
dataUpdates = Observable
    .merge(forceReloadEvents, Observable.timer(0, pullInterval, TimeUnit.SECONDS))
    .flatMap(new Func1<Long, Observable<Boolean>>() {
        @Override
        public Observable<Boolean> call(Long t) {
            return reloadData(); // operation that may take long
        }
    })
    .publish();
dataUpdates.subscribe();
dataUpdates.connect();

然后后来我有

public void forceReload() {
    final CountDownLatch cdl = new CountDownLatch(1);
    dataUpdates
        .take(1)
        .subscribe(
            new Action1<Boolean>() {
                @Override
                public void call(Boolean b) {
                    cdl.countDown();
                }
            }
        );
    forceReloadEvents.onNext(-1L);
    try {
        cdl.await();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

这有效,但问题是当我开始对forceReload()进行多个并发调用时:不会并发执行reloadData()但元素将排队,并且进程将在重新加载数据时循环,直到发送到forceReloadEvents的所有事件都被消耗,即使由于先前的事件释放CountDownLatch forceReload()已经完成。

我想使用onBackPressureDrop但似乎没有诱导背压,也没有掉落任何东西。我想要的是某种强制背压的方法,以便合并了解一次只能处理一个元素,并且必须删除任何后续事件,直到当前执行完成。

我也考虑过使用 bufferthrottleFirst,但我不想在每个事件之间强制使用特定时间,我宁愿根据重新加载缓存所需的时间进行这种自动缩放。您可以将其视为throttleFirst,直到reloadData完成。

编辑:根据注释,您可以在flatMap中将AtomicBoolean作为门,以便在门再次打开之前不开始重新加载:

public class AvoidReloadStorm {
    static Observable<Boolean> reload() {
        return Observable.just(true)
        .doOnNext(v -> System.out.println("Reload started..."))
        .delay(10, TimeUnit.SECONDS)
        .doOnNext(v -> System.out.println("Reloaded"));
    }
    public static void main(String[] args) throws Exception {
        Subject<Long, Long> manual = PublishSubject.<Long>create().toSerialized();
        Observable<Long> timer = Observable.timer(0, 5, TimeUnit.SECONDS)
                .doOnNext(v -> System.out.println("Timer reload"));
        AtomicBoolean running = new AtomicBoolean();
        ConnectableObservable<Boolean> src = Observable
        .merge(manual.onBackpressureDrop(), timer.onBackpressureDrop())
        .observeOn(Schedulers.io())
        .flatMap(v -> {
            if (running.compareAndSet(false, true)) {
                return reload().doOnCompleted(() -> {
                    running.set(false);
                });
            }
            System.out.println("Reload rejected");
            return Observable.empty();
        }).publish();
        src.subscribe(System.out::println);
        src.connect();
        Thread.sleep(100000);
    }
}

我做了这项工作,这要归功于 akarnokd!

这是我根据他的回答创建的解决方案:

Observable<Long> forceReloadEvents = this.forceReloadEvents
    .asObservable()
    .onBackpressureDrop();
Observable<Long> periodicReload = Observable
    .timer(0, pullInterval, TimeUnit.SECONDS)
    .onBackpressureDrop();
final AtomicBoolean running = new AtomicBoolean();
dataUpdates = Observable
    .merge(forceReloadEvents, periodicReload)
    .filter(new Func1<Long, Boolean>() {
        @Override
        public Boolean call(Long t) {
            return running.compareAndSet(false, true);
        }
    })
    .observeOn(Schedulers.io())
    .flatMap(new Func1<Long, Observable<Boolean>>() {
        @Override
        public Observable<Boolean> call(Long t) {
            return reloadData();
        }
    })
    .doOnNext(new Action1<Boolean>() {
        @Override
        public void call(Boolean t) {
            running.set(false);
        }
    })
    .publish();
dataUpdates.subscribe();
dataUpdates.connect();

我不确定onBackpressureDrop在这里有用,但我将其设置为预防措施。

强制重新加载代码不会更改。

相关内容

  • 没有找到相关文章

最新更新