我有一个发出许多对象的可观察对象,我想使用window
或buffer
操作对这些对象进行分组。但是,我希望能够使用自定义条件,而不是指定一个count
参数来确定窗口中应该有多少对象。
例如,假设可观察量正在发出如下所示的Message
类实例。
class Message(
val int size: Int
)
我想根据消息实例的size
变量而不仅仅是它们的计数来缓冲或窗口消息实例。例如,获取总大小最多为 5000 的消息窗口。
// Something like this
readMessages()
.buffer({ message -> message.size }, 5000)
有没有简单的方法可以做到这一点?
首先,我必须承认,我不是RxJava专家。 我只是发现你的问题具有挑战性,并试图找到解决方案。
有一个带有参数boundaryIndicator
的window()
函数。如果达到窗口大小,则必须创建一个发出项目的Publisher
/Flowable
。
在示例中,我创建了一个用作boundaryIndicator
的对象windowManager
。在onNext
回调中,我调用了windowManager
并给它一个打开新窗口的机会。
val windowManager = object {
lateinit var emitter: FlowableEmitter<Unit>
var windowSize: Long = 0
fun createEmitter(emitter: FlowableEmitter<Unit>) {
this.emitter = emitter
}
fun openWindowIfRequired(size: Long) {
windowSize += size
if (windowSize > 5) {
windowSize = 0
emitter.onNext(Unit)
}
}
}
val windowBoundary = Flowable.create<Unit>(windowManager::createEmitter, BackpressureStrategy.ERROR)
Flowable.interval(1, TimeUnit.SECONDS).window(windowBoundary).subscribe {
it.doOnNext {
windowManager.openWindowIfRequired(it)
}.doOnSubscribe {
println("Open window")
}.doOnComplete {
println("Close window")
}.subscribe {
println(it)
}
}