我正在尝试弄清楚如何使用akka-stream执行以下有状态操作:
假设我通过流发出一组元素 e,其中包含一组任意元素 a。
我想根据元素 e 接收的元素a 的总数来限制通过下游的元素 e 的数量。
例如 4
传入流
--> E1(A1E1(
--> E2(A1E2, A2E2(
--> E3(A1E3(
--> E4(A1E4, A2E4(
--> E5(A1E5, A2E5(
会发出
组 1 [E1、E2、E3]
组 2 [E4, E5]
最终,这应该像在 groupWithin 中一样计时。如果经过一定的时间,那么只需发出您拥有的任何东西。
听起来statefulmapContact可能是要看的东西,但我不确定。
我从描述中假设,你想要控制你向下游生成元素的速度。此外,处理每个元素的成本也不同。
开箱即用的选项很少,可以控制流的速度。
可能你想使用
throttle
.它控制流的吞吐量。限制 - 将吞吐量限制为每个时间单位的特定元素数,或每个时间单位的特定总成本,其中必须提供一个函数来计算每个元素的单个成本。 import java.time.LocalDateTime
import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl.Source import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.util.Random object ThrottleExample extends App { implicit val sys: ActorSystem = ActorSystem() implicit val mat: ActorMaterializer = ActorMaterializer() case class E(as: Seq[Int]) val f = Source(1 to 20) .map(_ => Random.nextInt(7)) .map(len => E((1 to len).map(_ => 1))) .throttle(5, 1.second, _.as.size) .runForeach(e => { println(s"${LocalDateTime.now()} -> $e") }) f.onComplete(_ => { mat.shutdown() sys.terminate() }) }
另一种选择是使用分组流的组合,例如
groupedWeightedWithin
(将元素批量处理到某个批次成本或经过的时间(或batchWeighted
(如果下游速度较慢,则进行批量/聚合(以及简单的throttle
。groupedWeightedIn - 将此流分成在时间窗口内接收的元素组,或受元素权重的限制,无论先发生什么。
batchWeighted - 只要存在背压并且尚未达到批处理元素的最大权重,就可以通过将传入元素和摘要传递到聚合函数中来允许下游速度较慢。import java.time.LocalDateTime import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl.Source import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.util.Random object GroupedWithingExample extends App { implicit val sys: ActorSystem = ActorSystem() implicit val mat: ActorMaterializer = ActorMaterializer() case class E(as: Seq[Int]) val f = Source(1 to 20) .map(_ => Random.nextInt(5)) .map(len => E((1 to len).map(_ => 1))) .groupedWeightedWithin(7, 1.second)(_.as.length) .throttle(1, 1.second) .runForeach(e => { println(s"${LocalDateTime.now()} -> $e") }) f.onComplete(_ => { mat.shutdown() sys.terminate() }) }