RxJava和PublishSubject的初学者问题



我对RxJava中的PublishSubject有疑问。我已经创建了一个发出一些对象的伪PublishSubject。这是我的代码:

override fun generate(exportRequest: ExportRequest): Observable<Report> {
val faker = Faker()
val dummyPublisher = PublishSubject.create<Report>()
for(x in 1..1_000){
val dataToExport = DataToExport(UUID.randomUUID(), faker.company().buzzword(), faker.company().name())
val report = Report(dataToExport)
sddPublisher.onNext(report)
Thread.sleep(1)
}
dummyPublisher.onComplete()
return dummyPublisher
}

订阅时,不会发出任何对象。例如,没有打印任何内容:

... // somewhere in the code
reportStrategy.generate(exportRequest).subscribe { report: Report? ->
println(report)
}

也许我错过了什么。如有任何帮助,将不胜感激

正如@akarnokd在注释中所指出的,您创建的PublishSubject会立即发出其onNext方法传递给它的任何值。无论当前是否订阅了它,都会发生这种情况。它的设计主要是为了帮助弥合基于命令或回调的代码与响应代码之间的差距。

你似乎想要的是一个Observable,一旦有人订阅了它,它就会开始执行一些同步代码。Observable.create是创建这样一个实例的一种方法,但正确使用可能会很麻烦。

创建所需内容的一种更方便的方法是Observable.fromPublisher。它以Publisher作为自变量。Publisher本身就是一个函数,每当Observer订阅由fromPublisher创建的Observable时,它就会传递一个Subscriber实例,并允许您直接向该Observer发送事件。

你想要的代码看起来像这样:

fun generateReportStream(genFakeReport: () -> Report): Observable<Report> {
return Observable.fromPublisher { subscriber ->
for (x in 1..1_000) {
val fakeReport = genFakeReport()
subscriber.onNext(fakeReport)
Thread.sleep(1)
}
subscriber.onComplete()
}
}
fun main() {
/** supply whatever logic you want to generate a fake [Report] */
fun genFakeReport(): Report = TODO()
val subscription = generateReportStream(::genFakeReport).subscribe(::println)
}

一旦订阅了generateReportStream返回的Observable实例,这将正确地发出值。此外,可以对同一实例进行更多的订阅,每个订阅都将使用相同的逻辑发出新的序列值。

最新更新