当流停止时,跳过部分流

  • 本文关键字:过部 分流 akka-stream
  • 更新时间 :
  • 英文 :


我遇到过消息通过组件传递的情况,这可能会延迟它们。

在压力下,我想跳过这个组件,以便同时延迟的消息不超过 X 条。溢出的消息将跳过此阶段并进入蒸汽的下一阶段。

消息在此阶段内停滞不前,直到其未来完成,或最多一分钟,以先到者为准。

我可能可以实现类似于此缓冲区示例的自定义 GraphStage,或者使用带有一些计数器的divertTo 来导致消息跳过停滞的组件, 但感觉在 AKKA 流中可能有更简单的方法

我一直在玩你的用例,并提出了一个基于代表计数器和异步映射阶段的Akka Actor的解决方案:

这个想法是,在任何给定时间最多可以处理3个元素,并且基于最大容量为2的计数器,我们只允许这些元素中的最多2同时由慢速组件处理。

这样,始终有一个处理线程保留给上游元素,这些线程将从慢速组件分支出来并直接到达下游。


让我们首先定义一个具有最大容量的基本Counter作为Akka Actor

class Counter(max: Int) extends Actor {
private var count: Int = 0
override def receive: Receive = {
case TryAndLock if count < max =>
count += 1
sender ! true
case TryAndLock =>
sender ! false
case Release =>
count -= 1
}
}
sealed trait CounterAction
case object TryAndLock extends CounterAction
case object Release extends CounterAction
val counter = system.actorOf(Props(new Counter(max = 2)))

它包含一个可变的count变量,可以通过TryAndLock请求递增,但前提是计数尚未达到最大容量,并且可以通过Release请求递减。

我们使用Actor以便正确处理以下mapAsync阶段中的并发锁定和释放操作,而无需争用条件。


然后,只需使用并行度仅比计数器最大容量高 1 个单元的mapAsyncUnordered级即可。

通过异步阶段的任何元素都将查询Counter以尝试锁定资源。如果资源被锁定,则该元素将被扔到慢速组件中。如果没有,它将跳过它。元素被传递到慢速组件中,直到我们达到计数器的最大容量,此时任何新元素都将被跳过,直到元素退出慢速组件并从计数器释放资源。

我们不能简单地使用mapAsync因为元素在存在阶段时会保持其上游的顺序,使跳过的元素等待慢速组件处理的元素完成,然后再在下游生成。因此,有必要改用mapAsyncUnordered

让我们定义一个示例,其中最多 2 个元素由慢速组件同时处理,以及并行度为 3 的异步映射:

Source(0 to 15)
.throttle(1, 50.milliseconds)
.mapAsyncUnordered(parallelism = 3) { i =>
(counter ? TryAndLock).map {
case locked: Boolean if locked =>
val result = slowTask(i)
counter ! Release
result
case _ =>
skipTask(i)
}
}
.runForeach(println)

例如,这两个函数将模拟慢速组件(slowTask(以及跳过慢速组件(skipTask(时该怎么做:

def slowTask(value: Int): String = {
val start = Instant.now()
Thread.sleep(250)
s"$value - processed - $start - ${Instant.now()}"
}
def skipTask(value: Int): String =
s"$value - skipped - ${Instant.now()}"

这会导致类似这样的结果:

2 - skipped - 2020-06-03T19:07:19.410Z
3 - skipped - 2020-06-03T19:07:19.468Z
4 - skipped - 2020-06-03T19:07:19.518Z
5 - skipped - 2020-06-03T19:07:19.569Z
1 - processed - 2020-06-03T19:07:19.356Z - 2020-06-03T19:07:19.611Z
0 - processed - 2020-06-03T19:07:19.356Z - 2020-06-03T19:07:19.611Z
8 - skipped - 2020-06-03T19:07:19.719Z
9 - skipped - 2020-06-03T19:07:19.769Z
10 - skipped - 2020-06-03T19:07:19.819Z
6 - processed - 2020-06-03T19:07:19.618Z - 2020-06-03T19:07:19.869Z
12 - skipped - 2020-06-03T19:07:19.919Z
7 - processed - 2020-06-03T19:07:19.669Z - 2020-06-03T19:07:19.921Z
14 - skipped - 2020-06-03T19:07:20.019Z
15 - skipped - 2020-06-03T19:07:20.070Z
11 - processed - 2020-06-03T19:07:19.869Z - 2020-06-03T19:07:20.122Z
13 - processed - 2020-06-03T19:07:19.968Z - 2020-06-03T19:07:20.219Z

其中第一部分是上游元素的索引,第二部分是元素应用的转换(processed进入慢速组件或skipped时(,最后一部分是时间戳,以便我们可视化事情发生的时间。

进入阶段的前 2 个元素(0 和 1(由慢速组件处理,一堆后续元素(2、3、4 和 5(跳过慢速阶段,直到这两个前元素完成并且其他元素可以进入慢速阶段。等等。

吉尔福伊尔

最新更新