以可观察的顺序对元素进行 RxGroup/Batch 突发



我有一个可观察的序列。插入第一个元素时,我想启动一个计时器,并在计时器的时间跨度内批量后续插入的元素。然后,计时器不会再次启动,直到在序列中插入另一个元素。

--------|=====timespan====|---------------|=====timespan====|-------------->
1  2 3 4    5                     6 

将产生:

[1,2,3,4,5], [6] 

我尝试了Observable.buffer()timespan但从我的实验中,我可以看到计时器在我们订阅可观察序列后立即启动,并在前一个计时器完成后立即重新启动。

因此,具有与前面示例相同的序列并将buffer()timespan一起使用,我会有这样的东西:

|=====timespan====|=====timespan====|=====timespan====|=====timespan====|-->
1  2 3 4                          5 6           

这将产生这个:

[1,2,3,4], [], [5,6], []

这本质上与29858974的问题相同,但针对的是Java。

所以问题是,由于我不想过多地延迟我的流,我希望有一个非常短的计时器,并且该计时器将非常密集。我可以简单地过滤空列表,但我认为这对 CPU 的影响太大了。

window运算符将充当buffer,您不能直接使用它。

这个想法是通过第一个可观测量的排放来控制timer(我称之为insertions(。为此,您必须包含第三个参数来链接两个可观察量(stopWatch解决方案中的主题(。

@Test
public void stop_watch_observable() {
Subject<Long> stopWatch = PublishSubject.create();
Observable<Long> insertions = insertions();
//share to use it as a timer (looking for the first emission)
//and to recieve the items
Observable<Long> shared = insertions.share();
//for each emission of insertions we start a new timer
//but only the first one is emitted
//the others are stopped by the takeUntil(stopWatch)
Observable<Long> window = shared
.flatMap(e -> Observable.timer(3, TimeUnit.SECONDS).takeUntil(stopWatch));
shared.buffer(window)
//each time a window is generated we kill all the current timers
.doOnNext(e -> stopWatch.onNext(0L))
.blockingSubscribe(System.out::println);
}
// insertions generator which is comming randomly
private Observable<Long> insertions() {
AtomicLong al = new AtomicLong(0);
return Observable.generate((Emitter<Long> emitter) -> {
if (al.getAndIncrement() % 4 == 0) {
Long timeToWait = Long.parseLong(RandomStringUtils.randomNumeric(1));
System.out.println("sleeping for: " + timeToWait);
sleep(timeToWait * 1000);
} else {
sleep(500);
}
emitter.onNext(al.get());
}).subscribeOn(Schedulers.newThread());
}

第一种解决方案的缺点是每次插入都会启动一个timer(可能是 CPU 密集型的(。在这里,另一个只有一个计时器的解决方案在一次启动(我认为这样更有效:

@Test
public void stop_watch_observable() {
Observable<Long> insertions = insertions();
Observable<Long> shared = insertions.share();
AtomicBoolean timerOn = new AtomicBoolean(false);
Observable<Long> window = shared
.flatMap(e -> timerOn.get() ? Observable.empty() : Observable.timer(3, TimeUnit.SECONDS)
.doOnSubscribe(x -> timerOn.set(true))
);
shared.buffer(window)
//each time a window is generated we kill all the current timers
.doOnNext(e -> timerOn.set(false))
.blockingSubscribe(System.out::println);
}

相关内容

  • 没有找到相关文章

最新更新