具有自定义计数条件的 RxJava 缓冲区/窗口



我有一个发出许多对象的可观察对象,我想使用windowbuffer操作对这些对象进行分组。但是,我希望能够使用自定义条件,而不是指定一个count参数来确定窗口中应该有多少对象。

例如,假设可观察量正在发出如下所示的Message类实例。

class Message(
val int size: Int
)

我想根据消息实例的size变量而不仅仅是它们的计数来缓冲或窗口消息实例。例如,获取总大小最多为 5000 的消息窗口。

// Something like this
readMessages()
.buffer({ message -> message.size }, 5000)

有没有简单的方法可以做到这一点?

首先,我必须承认,我不是RxJava专家。 我只是发现你的问题具有挑战性,并试图找到解决方案。

有一个带有参数boundaryIndicatorwindow()函数。如果达到窗口大小,则必须创建一个发出项目的Publisher/Flowable

在示例中,我创建了一个用作boundaryIndicator的对象windowManager。在onNext回调中,我调用了windowManager并给它一个打开新窗口的机会。

val windowManager = object {
lateinit var emitter: FlowableEmitter<Unit>
var windowSize: Long = 0
fun createEmitter(emitter: FlowableEmitter<Unit>) {
this.emitter = emitter
}
fun openWindowIfRequired(size: Long) {
windowSize += size
if (windowSize > 5) {
windowSize = 0
emitter.onNext(Unit)
}
}
}
val windowBoundary = Flowable.create<Unit>(windowManager::createEmitter, BackpressureStrategy.ERROR)
Flowable.interval(1, TimeUnit.SECONDS).window(windowBoundary).subscribe {
it.doOnNext {
windowManager.openWindowIfRequired(it)
}.doOnSubscribe {
println("Open window")
}.doOnComplete {
println("Close window")
}.subscribe {
println(it)
}
}

相关内容

  • 没有找到相关文章

最新更新