连接Akka流中的两个流



我正在尝试连接两个流,我无法解释我的实现的输出。

val source = Source(1 to 10)
val sink = Sink.foreach(println)
val flow1 = Flow[Int].map(s => s + 1)
val flow2 = Flow[Int].map(s => s * 10)
val flowGraph = Flow.fromGraph(
GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val concat = builder.add(Concat[Int](2))
val broadcast = builder.add(Broadcast[Int](2))
broadcast ~> flow1 ~> concat.in(0)
broadcast ~> flow2 ~> concat.in(1)
FlowShape(broadcast.in, concat.out)
}
)
source.via(flowGraph).runWith(sink)

我希望从这段代码得到以下输出:

2
3
4
.
.
.
11
10
20
.
.
.
100

我只看到"2"被打印出来。你能解释一下我在实现中出了什么问题,我该如何修改程序以得到想要的输出吗?

来自Akka Stream的API文档:

Concat:

在当前流中有可用元素时发出;如果当前输入完成,则尝试下一个

Broadcast:

在所有输出停止反压并且有可用的输入元素时发出

这两个操作符不能一起工作,因为它们的工作方式存在冲突——Concat试图在切换到另一个输出之前从Broadcast的一个输出中提取所有元素,而Broadcast除非对其所有输出都有需求,否则不会释放。

对于您需要的,您可以使用concat按照注释建议进行连接:

source.via(flow1).concat(source.via(flow2)).runWith(sink)

或者类似地,像下面这样使用Source.combine:

Source.combine(source.via(flow1), source.via(flow2))(Concat[Int](_)).runWith(sink)

使用GraphDSL,这是Source.combine:

的简化版本
val sg = Source.fromGraph(
GraphDSL.create(){ implicit builder =>
import GraphDSL.Implicits._
val concat = builder.add(Concat[Int](2))
source ~> flow1 ~> concat
source ~> flow2 ~> concat
SourceShape(concat.out)
}
)
sg.runWith(sink)

相关内容

  • 没有找到相关文章

最新更新