我正在玩 Akka Streams,并且已经弄清楚了大部分基础知识,但我不清楚如何获取Merge
的结果并对其进行进一步的操作(映射、过滤、折叠等)。
我想修改以下代码,以便我可以进一步操作数据,而不是将合并管道传输到接收器。
implicit val materializer = FlowMaterializer()
val items_a = Source(List(10,20,30,40,50))
val items_b = Source(List(60,70,80,90,100))
val sink = ForeachSink(println)
val materialized = FlowGraph { implicit builder =>
import FlowGraphImplicits._
val merge = Merge[Int]("m1")
items_a ~> merge
items_b ~> merge ~> sink
}.run()
我想我的主要问题是我无法弄清楚如何制作没有源的流组件,并且我无法弄清楚如何在不使用特殊的 Merge 对象和~>
语法的情况下进行合并。
编辑:这个问题和答案是针对并与Akka Streams 0.11一起使用
如果你不关心元素随机向下游移动的Merge
语义,那么你可以尝试concat
Source
,就像这样:
items_a.concat(items_b).map(_ * 2).map(_.toString).foreach(println)
这里的区别在于,来自a
的所有项目将首先流向下游,然后再流向任何b
元素。 如果你真的需要 Merge
的行为,那么你可以考虑如下(请记住,你最终需要一个接收器,但你当然可以在合并后进行额外的转换):
val items_a = Source(List(10,20,30,40,50))
val items_b = Source(List(60,70,80,90,100))
val sink = ForeachSink[Double](println)
val transform = Flow[Int].map(_ * 2).map(_.toDouble).to(sink)
val materialized = FlowGraph { implicit builder =>
import FlowGraphImplicits._
val merge = Merge[Int]("m1")
items_a ~> merge
items_b ~> merge ~> transform
}.run
在此示例中,您可以看到我使用Flow
同伴中的帮助程序创建了一个没有特定输入Source
的Flow
。 然后,我可以从那里将其附加到合并点以获得我的附加处理。
使用 Source.combine
:
val items_a :: items_b :: items_c = List(
Source(List(10,20,30,40,50)),
Source(List(60,70,80,90,100),
Source(List(110,120,130,140,1500))
Source.combine(items_a, items_b, items_c : _*)(Merge(_))
.map(_+1)
.runForeach(println)
,如果您需要保留输入源的顺序(例如items_a必须在items_b之前,items_b必须在items_c之前),您可以使用Concat而不是Merge。
val items_a :: items_b :: items_c = List(
Source(List(10,20,30,40,50)),
Source(List(60,70,80,90,100),
Source(List(110,120,130,140,1500))
Source.combine(items_a, items_b, items_c : _*)(Concat(_))