我遇到过消息通过组件传递的情况,这可能会延迟它们。
在压力下,我想跳过这个组件,以便同时延迟的消息不超过 X 条。溢出的消息将跳过此阶段并进入蒸汽的下一阶段。
消息在此阶段内停滞不前,直到其未来完成,或最多一分钟,以先到者为准。
我可能可以实现类似于此缓冲区示例的自定义 GraphStage,或者使用带有一些计数器的divertTo 来导致消息跳过停滞的组件, 但感觉在 AKKA 流中可能有更简单的方法
我一直在玩你的用例,并提出了一个基于代表计数器和异步映射阶段的Akka Actor
的解决方案:
这个想法是,在任何给定时间最多可以处理3
个元素,并且基于最大容量为2
的计数器,我们只允许这些元素中的最多2
同时由慢速组件处理。
这样,始终有一个处理线程保留给上游元素,这些线程将从慢速组件分支出来并直接到达下游。
让我们首先定义一个具有最大容量的基本Counter
作为Akka Actor
:
class Counter(max: Int) extends Actor {
private var count: Int = 0
override def receive: Receive = {
case TryAndLock if count < max =>
count += 1
sender ! true
case TryAndLock =>
sender ! false
case Release =>
count -= 1
}
}
sealed trait CounterAction
case object TryAndLock extends CounterAction
case object Release extends CounterAction
val counter = system.actorOf(Props(new Counter(max = 2)))
它包含一个可变的count
变量,可以通过TryAndLock
请求递增,但前提是计数尚未达到最大容量,并且可以通过Release
请求递减。
我们使用Actor
以便正确处理以下mapAsync
阶段中的并发锁定和释放操作,而无需争用条件。
然后,只需使用并行度仅比计数器最大容量高 1 个单元的mapAsyncUnordered
级即可。
通过异步阶段的任何元素都将查询Counter
以尝试锁定资源。如果资源被锁定,则该元素将被扔到慢速组件中。如果没有,它将跳过它。元素被传递到慢速组件中,直到我们达到计数器的最大容量,此时任何新元素都将被跳过,直到元素退出慢速组件并从计数器释放资源。
我们不能简单地使用mapAsync
因为元素在存在阶段时会保持其上游的顺序,使跳过的元素等待慢速组件处理的元素完成,然后再在下游生成。因此,有必要改用mapAsyncUnordered
。
让我们定义一个示例,其中最多 2 个元素由慢速组件同时处理,以及并行度为 3 的异步映射:
Source(0 to 15)
.throttle(1, 50.milliseconds)
.mapAsyncUnordered(parallelism = 3) { i =>
(counter ? TryAndLock).map {
case locked: Boolean if locked =>
val result = slowTask(i)
counter ! Release
result
case _ =>
skipTask(i)
}
}
.runForeach(println)
例如,这两个函数将模拟慢速组件(slowTask
(以及跳过慢速组件(skipTask
(时该怎么做:
def slowTask(value: Int): String = {
val start = Instant.now()
Thread.sleep(250)
s"$value - processed - $start - ${Instant.now()}"
}
def skipTask(value: Int): String =
s"$value - skipped - ${Instant.now()}"
这会导致类似这样的结果:
2 - skipped - 2020-06-03T19:07:19.410Z
3 - skipped - 2020-06-03T19:07:19.468Z
4 - skipped - 2020-06-03T19:07:19.518Z
5 - skipped - 2020-06-03T19:07:19.569Z
1 - processed - 2020-06-03T19:07:19.356Z - 2020-06-03T19:07:19.611Z
0 - processed - 2020-06-03T19:07:19.356Z - 2020-06-03T19:07:19.611Z
8 - skipped - 2020-06-03T19:07:19.719Z
9 - skipped - 2020-06-03T19:07:19.769Z
10 - skipped - 2020-06-03T19:07:19.819Z
6 - processed - 2020-06-03T19:07:19.618Z - 2020-06-03T19:07:19.869Z
12 - skipped - 2020-06-03T19:07:19.919Z
7 - processed - 2020-06-03T19:07:19.669Z - 2020-06-03T19:07:19.921Z
14 - skipped - 2020-06-03T19:07:20.019Z
15 - skipped - 2020-06-03T19:07:20.070Z
11 - processed - 2020-06-03T19:07:19.869Z - 2020-06-03T19:07:20.122Z
13 - processed - 2020-06-03T19:07:19.968Z - 2020-06-03T19:07:20.219Z
其中第一部分是上游元素的索引,第二部分是元素应用的转换(processed
进入慢速组件或skipped
时(,最后一部分是时间戳,以便我们可视化事情发生的时间。
进入阶段的前 2 个元素(0 和 1(由慢速组件处理,一堆后续元素(2、3、4 和 5(跳过慢速阶段,直到这两个前元素完成并且其他元素可以进入慢速阶段。等等。
吉尔福伊尔