有没有办法在运行时更改 Flowable.interval 周期?
LOGGER.info("Start generating bullshit for 7 seconds:");
Flowable.interval(3, TimeUnit.SECONDS)
.map(tick -> random.nextInt(100))
.subscribe(tick -> LOGGER.info("tick = " + tick));
TimeUnit.SECONDS.sleep(7);
LOGGER.info("Change interval to 2 seconds:");
我有一个解决方法,但最好的方法是创建一个新运算符。
此解决方案如何工作?
您有一个触发器源,它将提供值,何时开始开始新的间隔。源是 switchMap,间隔作为内部流。内部流采用上游源的输入值来设置新的间隔时间。
开关地图
当源发出时间(长(时,将调用开关映射 lambda,并且将立即订阅返回的 Flowable。当新值到达 switchMap 时,内部订阅的 Flowable 间隔将被取消订阅,并且 lambda 将再次被调用。返回的Inverval-Flowable将被重新订阅。
这意味着,在每次从源发出时,都会创建一个新的 Inveral。
它的行为如何?
当内部流已订阅并即将发出新值并且从源发出新值时,将取消订阅内部流 (inverval(。因此,不再发出该值。新的间隔可流动已订阅,并将向其配置发出值。
溶液
lateinit var scheduler: TestScheduler
@Before
fun init() {
scheduler = TestScheduler()
}
@Test
fun `62232235`() {
val trigger = PublishSubject.create<Long>()
val switchMap = trigger.toFlowable(BackpressureStrategy.LATEST)
// make sure, that a value is emitted from upstream, in order to make sure, that at least one interval emits values, when the upstream-sources does not provide a seed value.
.startWith(3)
.switchMap {
Flowable.interval(it, TimeUnit.SECONDS, scheduler)
.map { tick: Long? ->
tick
}
}
val test = switchMap.test()
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
test.assertValues(0, 1, 2)
// send new onNext value at absolute time 10
trigger.onNext(10)
// the inner stream is unsubscribed and a new stream with inverval(10) is subscribed to. Therefore the first vale will be emitted at 20 (current: 10 + 10 configured)
scheduler.advanceTimeTo(21, TimeUnit.SECONDS)
// if the switch did not happen, there would be 7 values
test.assertValues(0, 1, 2, 0)
}