ZIP 运算符不适用于 PublishSubject,我做错了什么?



我是RxJava的新手,无法意识到 - 为什么当我使用两个PublishSubject时,我的"zipped"可观察量没有发出项目?(据我所知,ZIP运算符应该将两个流"合并"为一个)

val currentSubject = PublishSubject.create<Int>()
val maxSubject = PublishSubject.create<Int>()
currentSubject.onNext(1)
maxSubject.onNext(2)
currentSubject.onNext(1)
maxSubject.onNext(2)
Log.d("custom", "BINGO!")
val zipped = Observables.zip(currentSubject, maxSubject) { current, max -> "current : $current, max : $max " }
zipped.subscribe(
{ Log.d("custom", it) },
{ Log.d("custom", "BONGO!") },
{ Log.d("custom", "KONGO!") }
)
currentSubject.onComplete()
maxSubject.onComplete()

我希望这些项目显示在"{ Log.d("custom", it) }"函数中,但它没有发生。我做错了什么?

编译后日志:

2019-06-25 22:25:36.802 3631-3631/ru.grigoryev.rxjavatestdeleteAfter D/custom:宾果游戏!

2019-06-25 22:25:36.873 3631-3631/ru.grigoryev.rxjavatestdeleteafter D/custom: KONGO!

这里的问题不在于您的zip实现,而在于PublishSubject的默认行为。但首先,让我们备份

热可观察量和冷量

在Rx中,有两种类型的Obervableshotcold。最常见的类型是可观察coldcoldoberable 在调用它之前不会开始发出.subscribe()值。

val obs = Observable.fromIterable(listOf(1, 2, 3, 4);
obs.subscribe { print(it) }
// Prints 1, 2, 3, 4

hot可观察量将发出值,而不管观察者是否订阅了它。

val subject = PublishSubject.create<Int>()
subject.onNext(1)
subject.onNext(2)
subject.subscribe { print(it) }
subject.onNext(3)
subject.onNext(4)
// Prints 3, 4

请注意如何12未打印的位置。这是因为PublishSubject是可观察hot的,并且在订阅之前发出12

返回您的问题

在您的示例中,您的发布主题在订阅之前发出 1 和 2。若要查看它们zipped在一起,请移动代码。

val currentSubject = PublishSubject.create<Int>()
val maxSubject = PublishSubject.create<Int>()
Log.d("custom", "BINGO!")
val zipped = Observables.zip(currentSubject, maxSubject) { current, max -> "current : $current, max : $max " }
zipped.subscribe(
{ Log.d("custom", it) },
{ Log.d("custom", "BONGO!") },
{ Log.d("custom", "KONGO!") }
)
currentSubject.onNext(1)
maxSubject.onNext(2)
currentSubject.onNext(1)
maxSubject.onNext(2)

currentSubject.onComplete()
maxSubject.onComplete()

相关内容

最新更新