我需要像 monix.Observable.bufferTimedAndCounted
这样的功能,但具有自定义" weither"。我找到了bufferTimedWithPressure
运算符,该操作员允许使用Item的Weith:
val subj = PublishSubject[String]()
subj
.bufferTimedWithPressure(1.seconds, 5, _ => 3)
.subscribe(s => {
println(s)
Future.successful(Ack.Continue)
})
for (i <- 1 to 60) {
Thread.sleep(100)
subj.onNext(i.toString)
}
但要发射每个指定的持续时间。我需要诸如bufferTimedAndCounted
之类的行为,因此在缓冲液满足时会发生排放。如何实现?
我从monix来源复制了BuffertimedObserbable,并稍微更改它,添加重量功能(注意 - 在所有情况下我都没有对其进行测试(:
import java.util.concurrent.TimeUnit
import monix.execution.Ack.{Continue, Stop}
import monix.execution.cancelables.{CompositeCancelable, MultiAssignCancelable}
import monix.execution.{Ack, Cancelable}
import monix.reactive.Observable
import monix.reactive.observers.Subscriber
import scala.collection.mutable.ListBuffer
import scala.concurrent.Future
import scala.concurrent.duration.{Duration, FiniteDuration, MILLISECONDS}
/**
* Copied from monix sources, adopted to size instead count
*
*/
final class BufferTimedWithWeigthObservable[+A](source: Observable[A], timespan: FiniteDuration, maxSize: Int, sizeOf: A => Int)
extends Observable[Seq[A]] {
require(timespan > Duration.Zero, "timespan must be strictly positive")
require(maxSize >= 0, "maxSize must be positive")
def unsafeSubscribeFn(out: Subscriber[Seq[A]]): Cancelable = {
val periodicTask = MultiAssignCancelable()
val connection = source.unsafeSubscribeFn(new Subscriber[A] with Runnable {
self =>
implicit val scheduler = out.scheduler
private[this] val timespanMillis = timespan.toMillis
// MUST BE synchronized by `self`
private[this] var ack: Future[Ack] = Continue
// MUST BE synchronized by `self`
private[this] var buffer = ListBuffer.empty[A]
// MUST BE synchronized by `self`
private[this] var currentSize = 0
private[this] var sizeOfLast = 0
private[this] var expiresAt = scheduler.clockMonotonic(MILLISECONDS) + timespanMillis
locally {
// Scheduling the first tick, in the constructor
periodicTask := out.scheduler.scheduleOnce(timespanMillis, TimeUnit.MILLISECONDS, self)
}
// Runs periodically, every `timespan`
def run(): Unit = self.synchronized {
val now = scheduler.clockMonotonic(MILLISECONDS)
// Do we still have time remaining?
if (now < expiresAt) {
// If we still have time remaining, it's either a scheduler
// problem, or we rushed to signaling the bundle upon reaching
// the maximum size in onNext. So we sleep some more.
val remaining = expiresAt - now
periodicTask := scheduler.scheduleOnce(remaining, TimeUnit.MILLISECONDS, self)
} else if (buffer != null) {
// The timespan has passed since the last signal so we need
// to send the current bundle
sendNextAndReset(now, byPeriod = true).syncOnContinue(
// Schedule the next tick, but only after we are done
// sending the bundle
run())
}
}
// Must be synchronized by `self`
private def sendNextAndReset(now: Long, byPeriod: Boolean = false): Future[Ack] = {
val prepare = if (byPeriod) buffer else buffer.dropRight(1)
// Reset
if (byPeriod) {
buffer = ListBuffer.empty[A]
currentSize = 0
sizeOfLast = 0
} else {
buffer = buffer.takeRight(1)
currentSize = sizeOfLast
}
// Setting the time of the next scheduled tick
expiresAt = now + timespanMillis
ack = ack.syncTryFlatten.syncFlatMap {
case Continue => out.onNext(prepare)
case Stop => Stop
}
ack
}
def onNext(elem: A): Future[Ack] = self.synchronized {
val now = scheduler.clockMonotonic(MILLISECONDS)
buffer.append(elem)
sizeOfLast = sizeOf(elem)
currentSize = currentSize + sizeOfLast
// 9 and 9 true
//10 and 9
if (expiresAt <= now || (maxSize > 0 && maxSize < currentSize)) {
sendNextAndReset(now)
}
else {
Continue
}
}
def onError(ex: Throwable): Unit = self.synchronized {
periodicTask.cancel()
ack = Stop
buffer = null
out.onError(ex)
}
def onComplete(): Unit = self.synchronized {
periodicTask.cancel()
if (buffer.nonEmpty) {
val bundleToSend = buffer.toList
// In case the last onNext isn't finished, then
// we need to apply back-pressure, otherwise this
// onNext will break the contract.
ack.syncOnContinue {
out.onNext(bundleToSend)
out.onComplete()
}
} else {
// We can just stream directly
out.onComplete()
}
// GC relief
buffer = null
// Ensuring that nothing else happens
ack = Stop
}
})
CompositeCancelable(connection, periodicTask)
}
}
如何使用它:
object MonixImplicits {
implicit class RichObservable[+A](source: Observable[A]) {
def bufferTimedAndSized(timespan: FiniteDuration, maxSize: Int, sizeOf: A => Int): Observable[Seq[A]] = {
new BufferTimedWithWeigthObservable(source, timespan, maxSize, sizeOf)
}
}
}
import MonixImplicits._
someObservable.bufferTimedAndSized(1.seconds, 5, item => item.size)