如何将 Akka 流合并的输出通过管道传输到另一个流



我正在玩 Akka Streams,并且已经弄清楚了大部分基础知识,但我不清楚如何获取Merge的结果并对其进行进一步的操作(映射、过滤、折叠等)。

我想修改以下代码,以便我可以进一步操作数据,而不是将合并管道传输到接收器。

implicit val materializer = FlowMaterializer()
val items_a = Source(List(10,20,30,40,50))
val items_b = Source(List(60,70,80,90,100))
val sink = ForeachSink(println)
val materialized = FlowGraph { implicit builder =>
  import FlowGraphImplicits._
  val merge = Merge[Int]("m1")
  items_a ~> merge
  items_b ~> merge ~> sink
}.run()

我想我的主要问题是我无法弄清楚如何制作没有源的流组件,并且我无法弄清楚如何在不使用特殊的 Merge 对象和~>语法的情况下进行合并。

编辑:这个问题和答案是针对并与Akka Streams 0.11一起使用

如果你不关心元素随机向下游移动的Merge语义,那么你可以尝试concat Source,就像这样:

items_a.concat(items_b).map(_ * 2).map(_.toString).foreach(println)

这里的区别在于,来自a的所有项目将首先流向下游,然后再流向任何b元素。 如果你真的需要 Merge 的行为,那么你可以考虑如下(请记住,你最终需要一个接收器,但你当然可以在合并后进行额外的转换):

val items_a = Source(List(10,20,30,40,50))
val items_b = Source(List(60,70,80,90,100))
val sink = ForeachSink[Double](println)
val transform = Flow[Int].map(_ * 2).map(_.toDouble).to(sink)

val materialized = FlowGraph { implicit builder =>
  import FlowGraphImplicits._
  val merge = Merge[Int]("m1")
  items_a ~> merge
  items_b ~> merge ~> transform
}.run

在此示例中,您可以看到我使用Flow同伴中的帮助程序创建了一个没有特定输入SourceFlow。 然后,我可以从那里将其附加到合并点以获得我的附加处理。

使用 Source.combine

val items_a :: items_b :: items_c = List(
         Source(List(10,20,30,40,50)), 
         Source(List(60,70,80,90,100), 
         Source(List(110,120,130,140,1500))
Source.combine(items_a, items_b, items_c : _*)(Merge(_))
         .map(_+1)
         .runForeach(println)
或者

,如果您需要保留输入源的顺序(例如items_a必须在items_b之前,items_b必须在items_c之前),您可以使用Concat而不是Merge。

val items_a :: items_b :: items_c = List(
     Source(List(10,20,30,40,50)), 
     Source(List(60,70,80,90,100), 
     Source(List(110,120,130,140,1500))
Source.combine(items_a, items_b, items_c : _*)(Concat(_))

相关内容

最新更新