我正在尝试连接两个流,我无法解释我的实现的输出。
val source = Source(1 to 10)
val sink = Sink.foreach(println)
val flow1 = Flow[Int].map(s => s + 1)
val flow2 = Flow[Int].map(s => s * 10)
val flowGraph = Flow.fromGraph(
GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val concat = builder.add(Concat[Int](2))
val broadcast = builder.add(Broadcast[Int](2))
broadcast ~> flow1 ~> concat.in(0)
broadcast ~> flow2 ~> concat.in(1)
FlowShape(broadcast.in, concat.out)
}
)
source.via(flowGraph).runWith(sink)
我希望从这段代码得到以下输出:
2
3
4
.
.
.
11
10
20
.
.
.
100
我只看到"2"被打印出来。你能解释一下我在实现中出了什么问题,我该如何修改程序以得到想要的输出吗?
来自Akka Stream的API文档:
Concat
:
在当前流中有可用元素时发出;如果当前输入完成,则尝试下一个
Broadcast
:
在所有输出停止反压并且有可用的输入元素时发出
这两个操作符不能一起工作,因为它们的工作方式存在冲突——Concat
试图在切换到另一个输出之前从Broadcast
的一个输出中提取所有元素,而Broadcast
除非对其所有输出都有需求,否则不会释放。
对于您需要的,您可以使用concat
按照注释建议进行连接:
source.via(flow1).concat(source.via(flow2)).runWith(sink)
或者类似地,像下面这样使用Source.combine
:
Source.combine(source.via(flow1), source.via(flow2))(Concat[Int](_)).runWith(sink)
使用GraphDSL
,这是Source.combine:
val sg = Source.fromGraph(
GraphDSL.create(){ implicit builder =>
import GraphDSL.Implicits._
val concat = builder.add(Concat[Int](2))
source ~> flow1 ~> concat
source ~> flow2 ~> concat
SourceShape(concat.out)
}
)
sg.runWith(sink)