折叠和减少在并行运行时显示出不确定性行为,为什么



所以我正在尝试使用Akka Streams统计项目的出现次数。下面的例子是我所拥有的一个简化版本。我需要两条管道同时工作。由于某些原因,打印的结果不正确。

有人知道为什么会发生这种事吗?我是不是遗漏了一些关于子流的重要内容?

/**
* SIMPLE EXAMPLE
*/
object TestingObject {
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import java.nio.file.Paths
import akka.util.ByteString
import counting._
import graph_components._
// implicit actor system
implicit val system:ActorSystem = ActorSystem("Sys")
def main(args: Array[String]): Unit = {
val customFlow = Flow.fromGraph(GraphDSL.create() {
implicit builder =>
import GraphDSL.Implicits._

// Components
val A   = builder.add(Balance[(Int, Int)](2, waitForAllDownstreams = true));
val B1   = builder.add(mergeCountFold.async);
val B2   = builder.add(mergeCountFold.async);
val C   = builder.add(Merge[(Int, Int)](2));
val D   = builder.add(mergeCountReduce);
// Graph
A ~> B1 ~> C ~> D
A ~> B2 ~> C
FlowShape(A.in, D.out);
})
// Run
Source(0 to 101)
.groupBy(10, x => x % 4)
.map(x => (x % 4, 1))
.via(customFlow)
.mergeSubstreams
.to(Sink.foreach(println)).run();
}
def mergeCountReduce = Flow[(Int, Int)].reduce((l, r) => {
println("REDUCING");
(l._1, l._2 + r._2)
})
def mergeCountFold = Flow[(Int, Int)].fold[(Int,Int)](0,0)((l, r) => {
println("FOLDING");
(r._1, l._2 + r._2)
})
}

两个观察结果:

  • mergeCountReduce将发出它看到的第一个键以及看到的值的总和(如果它没有看到任何元素,则会使流失败(
  • mergeCountFold将发出它看到的最后一个键和看到的值的总和(如果它没有看到任何元素,则会发出一个键,值为零(

(在两种情况下,尽管密钥始终相同(

这两个观测值都不受CCD_ 3边界的影响。

然而,在前面的Balance运算符的上下文中,async引入了一个隐式缓冲区,它可以防止它包装的图反向压缩,直到该缓冲区满为止。Balance将流值发送到没有反向压力的第一个输出,因此,如果Balance之后的阶段没有明显慢于上游,则Balance可以仅将值发送到一个输出(在这种情况下为B1(。

在这种情况下,对于reduceB1将发出密钥和计数,而B2失败,导致整个流失败。

对于fold,在该场景中,B1将发射密钥和计数,而没有看到任何值的B2将发射(0,0)。合并会按照它们发出的顺序发出它们(合理地假设有50/50的机会(,所以最后的折叠要么有键和计数,要么有零和计数。

最新更新