我是RxJava的新手,无法意识到 - 为什么当我使用两个PublishSubject时,我的"zipped"可观察量没有发出项目?(据我所知,ZIP运算符应该将两个流"合并"为一个)
val currentSubject = PublishSubject.create<Int>()
val maxSubject = PublishSubject.create<Int>()
currentSubject.onNext(1)
maxSubject.onNext(2)
currentSubject.onNext(1)
maxSubject.onNext(2)
Log.d("custom", "BINGO!")
val zipped = Observables.zip(currentSubject, maxSubject) { current, max -> "current : $current, max : $max " }
zipped.subscribe(
{ Log.d("custom", it) },
{ Log.d("custom", "BONGO!") },
{ Log.d("custom", "KONGO!") }
)
currentSubject.onComplete()
maxSubject.onComplete()
我希望这些项目显示在"{ Log.d("custom", it) }"函数中,但它没有发生。我做错了什么?
编译后日志:
2019-06-25 22:25:36.802 3631-3631/ru.grigoryev.rxjavatestdeleteAfter D/custom:宾果游戏!
2019-06-25 22:25:36.873 3631-3631/ru.grigoryev.rxjavatestdeleteafter D/custom: KONGO!
这里的问题不在于您的zip
实现,而在于PublishSubject
的默认行为。但首先,让我们备份
热可观察量和冷量
在Rx中,有两种类型的Obervables
,hot
和cold
。最常见的类型是可观察cold
。cold
oberable 在调用它之前不会开始发出.subscribe()
值。
val obs = Observable.fromIterable(listOf(1, 2, 3, 4);
obs.subscribe { print(it) }
// Prints 1, 2, 3, 4
hot
可观察量将发出值,而不管观察者是否订阅了它。
val subject = PublishSubject.create<Int>()
subject.onNext(1)
subject.onNext(2)
subject.subscribe { print(it) }
subject.onNext(3)
subject.onNext(4)
// Prints 3, 4
请注意如何1
和2
未打印的位置。这是因为PublishSubject
是可观察hot
的,并且在订阅之前发出1
和2
。
返回您的问题
在您的示例中,您的发布主题在订阅之前发出 1 和 2。若要查看它们zipped
在一起,请移动代码。
val currentSubject = PublishSubject.create<Int>()
val maxSubject = PublishSubject.create<Int>()
Log.d("custom", "BINGO!")
val zipped = Observables.zip(currentSubject, maxSubject) { current, max -> "current : $current, max : $max " }
zipped.subscribe(
{ Log.d("custom", it) },
{ Log.d("custom", "BONGO!") },
{ Log.d("custom", "KONGO!") }
)
currentSubject.onNext(1)
maxSubject.onNext(2)
currentSubject.onNext(1)
maxSubject.onNext(2)
currentSubject.onComplete()
maxSubject.onComplete()