我有一个可观察的序列。插入第一个元素时,我想启动一个计时器,并在计时器的时间跨度内批量后续插入的元素。然后,计时器不会再次启动,直到在序列中插入另一个元素。
--------|=====timespan====|---------------|=====timespan====|-------------->
1 2 3 4 5 6
将产生:
[1,2,3,4,5], [6]
我尝试了Observable.buffer()
和timespan
但从我的实验中,我可以看到计时器在我们订阅可观察序列后立即启动,并在前一个计时器完成后立即重新启动。
因此,具有与前面示例相同的序列并将buffer()
与timespan
一起使用,我会有这样的东西:
|=====timespan====|=====timespan====|=====timespan====|=====timespan====|-->
1 2 3 4 5 6
这将产生这个:
[1,2,3,4], [], [5,6], []
这本质上与29858974的问题相同,但针对的是Java。
所以问题是,由于我不想过多地延迟我的流,我希望有一个非常短的计时器,并且该计时器将非常密集。我可以简单地过滤空列表,但我认为这对 CPU 的影响太大了。
window
运算符将充当buffer
,您不能直接使用它。
这个想法是通过第一个可观测量的排放来控制timer
(我称之为insertions
(。为此,您必须包含第三个参数来链接两个可观察量(stopWatch
解决方案中的主题(。
@Test
public void stop_watch_observable() {
Subject<Long> stopWatch = PublishSubject.create();
Observable<Long> insertions = insertions();
//share to use it as a timer (looking for the first emission)
//and to recieve the items
Observable<Long> shared = insertions.share();
//for each emission of insertions we start a new timer
//but only the first one is emitted
//the others are stopped by the takeUntil(stopWatch)
Observable<Long> window = shared
.flatMap(e -> Observable.timer(3, TimeUnit.SECONDS).takeUntil(stopWatch));
shared.buffer(window)
//each time a window is generated we kill all the current timers
.doOnNext(e -> stopWatch.onNext(0L))
.blockingSubscribe(System.out::println);
}
// insertions generator which is comming randomly
private Observable<Long> insertions() {
AtomicLong al = new AtomicLong(0);
return Observable.generate((Emitter<Long> emitter) -> {
if (al.getAndIncrement() % 4 == 0) {
Long timeToWait = Long.parseLong(RandomStringUtils.randomNumeric(1));
System.out.println("sleeping for: " + timeToWait);
sleep(timeToWait * 1000);
} else {
sleep(500);
}
emitter.onNext(al.get());
}).subscribeOn(Schedulers.newThread());
}
第一种解决方案的缺点是每次插入都会启动一个timer
(可能是 CPU 密集型的(。在这里,另一个只有一个计时器的解决方案在一次启动(我认为这样更有效:
@Test
public void stop_watch_observable() {
Observable<Long> insertions = insertions();
Observable<Long> shared = insertions.share();
AtomicBoolean timerOn = new AtomicBoolean(false);
Observable<Long> window = shared
.flatMap(e -> timerOn.get() ? Observable.empty() : Observable.timer(3, TimeUnit.SECONDS)
.doOnSubscribe(x -> timerOn.set(true))
);
shared.buffer(window)
//each time a window is generated we kill all the current timers
.doOnNext(e -> timerOn.set(false))
.blockingSubscribe(System.out::println);
}