如何使用自定义WEIRE功能缓冲排放



我需要像 monix.Observable.bufferTimedAndCounted这样的功能,但具有自定义" weither"。我找到了bufferTimedWithPressure运算符,该操作员允许使用Item的Weith:

val subj = PublishSubject[String]()
subj
  .bufferTimedWithPressure(1.seconds, 5, _ => 3)
  .subscribe(s => {
    println(s)
    Future.successful(Ack.Continue)
  })
for (i <- 1 to 60) {
  Thread.sleep(100)
  subj.onNext(i.toString)
}

但要发射每个指定的持续时间。我需要诸如bufferTimedAndCounted之类的行为,因此在缓冲液满足时会发生排放。如何实现?

我从monix来源复制了BuffertimedObserbable,并稍微更改它,添加重量功能(注意 - 在所有情况下我都没有对其进行测试(:

import java.util.concurrent.TimeUnit
import monix.execution.Ack.{Continue, Stop}
import monix.execution.cancelables.{CompositeCancelable, MultiAssignCancelable}
import monix.execution.{Ack, Cancelable}
import monix.reactive.Observable
import monix.reactive.observers.Subscriber
import scala.collection.mutable.ListBuffer
import scala.concurrent.Future
import scala.concurrent.duration.{Duration, FiniteDuration, MILLISECONDS}
/**
  * Copied from monix sources, adopted to size instead count
  *
  */  
 final class BufferTimedWithWeigthObservable[+A](source: Observable[A], timespan: FiniteDuration, maxSize: Int, sizeOf: A => Int)
  extends Observable[Seq[A]] {
  require(timespan > Duration.Zero, "timespan must be strictly positive")
  require(maxSize >= 0, "maxSize must be positive")
  def unsafeSubscribeFn(out: Subscriber[Seq[A]]): Cancelable = {
    val periodicTask = MultiAssignCancelable()
    val connection = source.unsafeSubscribeFn(new Subscriber[A] with Runnable {
      self =>
      implicit val scheduler = out.scheduler
      private[this] val timespanMillis = timespan.toMillis
      // MUST BE synchronized by `self`
      private[this] var ack: Future[Ack] = Continue
      // MUST BE synchronized by `self`
      private[this] var buffer = ListBuffer.empty[A]
      // MUST BE synchronized by `self`
      private[this] var currentSize = 0
      private[this] var sizeOfLast = 0
      private[this] var expiresAt = scheduler.clockMonotonic(MILLISECONDS) + timespanMillis
      locally {
        // Scheduling the first tick, in the constructor
        periodicTask := out.scheduler.scheduleOnce(timespanMillis, TimeUnit.MILLISECONDS, self)
      }
      // Runs periodically, every `timespan`
      def run(): Unit = self.synchronized {
        val now = scheduler.clockMonotonic(MILLISECONDS)
        // Do we still have time remaining?
        if (now < expiresAt) {
          // If we still have time remaining, it's either a scheduler
          // problem, or we rushed to signaling the bundle upon reaching
          // the maximum size in onNext. So we sleep some more.
          val remaining = expiresAt - now
          periodicTask := scheduler.scheduleOnce(remaining, TimeUnit.MILLISECONDS, self)
        } else if (buffer != null) {
          // The timespan has passed since the last signal so we need
          // to send the current bundle
          sendNextAndReset(now, byPeriod = true).syncOnContinue(
            // Schedule the next tick, but only after we are done
            // sending the bundle
            run())
        }
      }
      // Must be synchronized by `self`
      private def sendNextAndReset(now: Long, byPeriod: Boolean = false): Future[Ack] = {
        val prepare = if (byPeriod) buffer else buffer.dropRight(1)
        // Reset
        if (byPeriod) {
          buffer = ListBuffer.empty[A]
          currentSize = 0
          sizeOfLast = 0
        } else {
          buffer = buffer.takeRight(1)
          currentSize = sizeOfLast
        }
        // Setting the time of the next scheduled tick
        expiresAt = now + timespanMillis
        ack = ack.syncTryFlatten.syncFlatMap {
          case Continue => out.onNext(prepare)
          case Stop => Stop
        }
        ack
      }
      def onNext(elem: A): Future[Ack] = self.synchronized {
        val now = scheduler.clockMonotonic(MILLISECONDS)
        buffer.append(elem)
        sizeOfLast = sizeOf(elem)
        currentSize = currentSize + sizeOfLast
        // 9 and 9 true
        //10 and 9
        if (expiresAt <= now || (maxSize > 0 && maxSize < currentSize)) {
          sendNextAndReset(now)
        }
        else {
          Continue
        }
      }
      def onError(ex: Throwable): Unit = self.synchronized {
        periodicTask.cancel()
        ack = Stop
        buffer = null
        out.onError(ex)
      }
      def onComplete(): Unit = self.synchronized {
        periodicTask.cancel()
        if (buffer.nonEmpty) {
          val bundleToSend = buffer.toList
          // In case the last onNext isn't finished, then
          // we need to apply back-pressure, otherwise this
          // onNext will break the contract.
          ack.syncOnContinue {
            out.onNext(bundleToSend)
            out.onComplete()
          }
        } else {
          // We can just stream directly
          out.onComplete()
        }
        // GC relief
        buffer = null
        // Ensuring that nothing else happens
        ack = Stop
      }
    })
    CompositeCancelable(connection, periodicTask)
  }
}

如何使用它:

object MonixImplicits {
  implicit class RichObservable[+A](source: Observable[A]) {
    def bufferTimedAndSized(timespan: FiniteDuration, maxSize: Int, sizeOf: A => Int): Observable[Seq[A]] = {
      new BufferTimedWithWeigthObservable(source, timespan, maxSize, sizeOf)
    }
  }
}
import MonixImplicits._
someObservable.bufferTimedAndSized(1.seconds, 5, item => item.size)

相关内容

  • 没有找到相关文章

最新更新