rxjava随着意外的延迟延迟了可观察的火灾



我使用rxjava可观测值设置了事件序列。基本上,我将与Observable.just(Events.*)创建的不同事件合并为与observable.delay(time, timeUnit, scheduler)函数设置的不同延迟。然后,我将它们发布到PublishSubject(以下代码中的events),然后订阅该PublishSubject以观察序列(以下代码中的observeEvents()函数)。它过去效果很好,但最近我看到设备上的行为非常奇怪(Android 5.0.2的OnePlus One)(并且在模拟器上看不到它)。基本上,事件混杂在一起,延迟较高的事件可能会发生在延迟较小的事件之前,延迟较小的事件可能会在队列结束时出现,有时所有事件都可能按正确的顺序出现。前三个事件通常混合在一起。有时根本没有观察到某些事件。这里可能会发生什么?

代码在kotlin中:

var computationScheduler = Schedulers.computation()
private val events: PublishSubject<Events> = PublishSubject.create()
private val userActionSubject: PublishSubject<Events> = PublishSubject.create()
Observable.merge(
            event0(),
            event1(),
            event2(),
            userActionOrEvent3(),
            userActionOrEvent4())
            .subscribe({
                // Weird timings are observed here already
                events.onNext(it)
            }, { e ->
                events.onError(e)
            }))
private fun userActionOrEvent4(): Observable<Events> {
    return Observable.amb(Observable.just(Events.Event4)
            .delay(12800, TimeUnit.MILLISECONDS, computationScheduler), userActionSubject.asObservable().subscribeOn(computationScheduler))
            .take(1)
}
private fun userActionOrEvent3(): Observable<Events> {
    return Observable.amb(Observable.just(Events.Event3)
            .delay(2800, TimeUnit.MILLISECONDS, computationScheduler), userActionSubject.asObservable().subscribeOn(computationScheduler))
            .take(1)
}
private fun event2() = Observable.just(Events.Event2)
        .delay(1800, TimeUnit.MILLISECONDS, computationScheduler)
private fun event1() = Observable.just(Events.Event1)
        .delay(200, TimeUnit.MILLISECONDS, computationScheduler)
private fun event0() = Observable.just(Events.Event0)
        .subscribeOn(computationScheduler)
open fun observeEvents(): Observable<Events> = events.asObservable().observeOn(AndroidSchedulers.mainThread())
open fun onUserAction() {
    userActionSubject.onNext(Events.Action)
}

这是因为您使用的是merge(),而不是concat

这篇媒介文章将向您解释差异https://medium.com/fueled-android/rxify-a-simple-simple-for-for-complex-rxjava-operators-part-2-b82b379f5c79f5c7f#.ekppat50o

结果表明,当我将Schedulers.computation()更改为Schedulers.newThread()事件时,问题是由ComputationsCheduler引起的。

最新更新