我有一段代码,他的工作是更新本地缓存。此缓存更新有两个触发器:
- 以固定的间隔
- 当要求时
所以这里有一个关于我是如何做到这一点的基本例子。
forceReloadEvents = new SerializedSubject<Long, Long>(PublishSubject.<Long> create());
dataUpdates = Observable
.merge(forceReloadEvents, Observable.timer(0, pullInterval, TimeUnit.SECONDS))
.flatMap(new Func1<Long, Observable<Boolean>>() {
@Override
public Observable<Boolean> call(Long t) {
return reloadData(); // operation that may take long
}
})
.publish();
dataUpdates.subscribe();
dataUpdates.connect();
然后后来我有
public void forceReload() {
final CountDownLatch cdl = new CountDownLatch(1);
dataUpdates
.take(1)
.subscribe(
new Action1<Boolean>() {
@Override
public void call(Boolean b) {
cdl.countDown();
}
}
);
forceReloadEvents.onNext(-1L);
try {
cdl.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
这有效,但问题是当我开始对forceReload()
进行多个并发调用时:不会并发执行reloadData()
但元素将排队,并且进程将在重新加载数据时循环,直到发送到forceReloadEvents
的所有事件都被消耗,即使由于先前的事件释放CountDownLatch
forceReload()
已经完成。
我想使用onBackPressureDrop
但似乎没有诱导背压,也没有掉落任何东西。我想要的是某种强制背压的方法,以便合并了解一次只能处理一个元素,并且必须删除任何后续事件,直到当前执行完成。
我也考虑过使用 buffer
或 throttleFirst
,但我不想在每个事件之间强制使用特定时间,我宁愿根据重新加载缓存所需的时间进行这种自动缩放。您可以将其视为throttleFirst
,直到reloadData
完成。
编辑:根据注释,您可以在flatMap中将AtomicBoolean作为门,以便在门再次打开之前不开始重新加载:
public class AvoidReloadStorm {
static Observable<Boolean> reload() {
return Observable.just(true)
.doOnNext(v -> System.out.println("Reload started..."))
.delay(10, TimeUnit.SECONDS)
.doOnNext(v -> System.out.println("Reloaded"));
}
public static void main(String[] args) throws Exception {
Subject<Long, Long> manual = PublishSubject.<Long>create().toSerialized();
Observable<Long> timer = Observable.timer(0, 5, TimeUnit.SECONDS)
.doOnNext(v -> System.out.println("Timer reload"));
AtomicBoolean running = new AtomicBoolean();
ConnectableObservable<Boolean> src = Observable
.merge(manual.onBackpressureDrop(), timer.onBackpressureDrop())
.observeOn(Schedulers.io())
.flatMap(v -> {
if (running.compareAndSet(false, true)) {
return reload().doOnCompleted(() -> {
running.set(false);
});
}
System.out.println("Reload rejected");
return Observable.empty();
}).publish();
src.subscribe(System.out::println);
src.connect();
Thread.sleep(100000);
}
}
我做了这项工作,这要归功于 akarnokd!
这是我根据他的回答创建的解决方案:
Observable<Long> forceReloadEvents = this.forceReloadEvents
.asObservable()
.onBackpressureDrop();
Observable<Long> periodicReload = Observable
.timer(0, pullInterval, TimeUnit.SECONDS)
.onBackpressureDrop();
final AtomicBoolean running = new AtomicBoolean();
dataUpdates = Observable
.merge(forceReloadEvents, periodicReload)
.filter(new Func1<Long, Boolean>() {
@Override
public Boolean call(Long t) {
return running.compareAndSet(false, true);
}
})
.observeOn(Schedulers.io())
.flatMap(new Func1<Long, Observable<Boolean>>() {
@Override
public Observable<Boolean> call(Long t) {
return reloadData();
}
})
.doOnNext(new Action1<Boolean>() {
@Override
public void call(Boolean t) {
running.set(false);
}
})
.publish();
dataUpdates.subscribe();
dataUpdates.connect();
我不确定onBackpressureDrop
在这里有用,但我将其设置为预防措施。
强制重新加载代码不会更改。