RxJava – 如何等待订阅者处理流



这与以下问题完全相同:

RX:如何等待订阅者完成?

我有一个生产者,它可以比订阅者更快地生成对象,所以我希望它并行生成对象,但在一些固定数量的项目之后进行阻塞。

像这样:

  1. 发布者生成项目
  2. 一旦生产出10个项目,订阅者就会接受它们并开始处理
  3. publisher继续并行地生成和缓冲项,但若生成了下一个10项,则等待subscriber
  4. 处理完成后,订阅者立即从缓冲区中取出接下来的10个对象
  5. 发布者继续生产和缓冲项目
  6. 等等

当前解决方案:

val bars = Flowable.fromStream(foos)
// calculate is faster
.map(foo -> calculateBars(foo))
.buffer(10)
// than saving to database
.flatMap(bars -> Flowable.fromStream(saveBars(bars)))
.collect(Collectors.toSet())
.blockingGet();

若我添加.observeOn(Schedulers.io()),那个么发布者根本不会等待使用者,最终会因OutOfMemoryError而崩溃。如果我删除它,它会一直等待,而不是准备下一批项目。

这与我的另一个问题有关https://stackoverflow.com/questions/73660953/keep-memory-usage-constant-with-jparepository-andrxjava。我认为解决这个问题就能解决这个问题。

经过数小时在java中尝试实现这一点后,我放弃了使用kotlin协程解决了这个问题

@Transactional(readOnly = true)
open fun calculate(): Set<UUID> {
val fooIds = findOperators()
val saved = mutableListOf<UUID>()
runBlocking(Dispatchers.IO) {
// Run calculations in a separated thread, which communicates
// with the main thread via a channel of batches.
// Meanwhile, the main thread is busy saving records to database as soon as they arrive
// from the calculation thread.
val channel = Channel<Collection<Bar>>(1)
launch {
var batch = mutableListOf<Bar>()
for (id in fooIds) {
val records = findRecordsByFoo(id)
if (batch.size == BATCH_SIZE) {
channel.send(batch)
batch = mutableListOf()
}
}
if (batch.isNotEmpty()) {
channel.send(batch)
}
try {
channel.close()
}
catch (e: ClosedReceiveChannelException) {
log.info("Finished processing foos")
}
}
for (bars in channel) {
val uuids = saveBars(bars)
saved.addAll(uuids.asSequence())
}
}
fooIds.close()
return saved.toSet()
}

编辑:这些是必需的渐变:

plugins {
id 'org.jetbrains.kotlin.jvm' version '1.7.10'
}
dependencies {
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.4'
}
// to put the kotlin file close to a java service which calls it:
sourceSets {
main.kotlin.srcDirs += 'src/main/java'
main.java.srcDirs += 'src/main/java'
}

最新更新