阿卡流中的流量问题



我在 Akka 流中很新,我已经使用 Rx 一段时间了,所以我对所有运算符都了如指掌,但我无法知道为什么我的管道不发出值

这是我的代码

  @Test def mainFlow(): Unit = {
    val increase = Flow[Int]
      .map(value => value * 10)
    val filterFlow = Flow[Int]
      .filter(value => value > 50)
      .take(2)
    Source(0 to 10)
      .via(increase)
      .via(filterFlow)
      .to(Sink.foreach(value => println(s"Item emitted:$value")))
      .run()
  }

第一个 Flow 转换源中发出的值乘以 10,第二个流过滤器仅获得高于 50 的项目,然后我只得到 2,所以我期望在 Sink 60 和 70但它什么也没发出。

知道为什么吗?

您的流已正确构建,并发出您提到的 2 个元素。我相信问题出在您的测试上。也就是说,流异步运行,您的测试是一个普通的Unit过程。因此,测试不会等到流运行。

您需要在测试中引入一些同步来执行断言。一种方法是使用 ScalaTest 中的ScalaFutures特征,它为您提供了一种futureValue方法。

val increase = Flow[Int]
  .map(value => value * 10)
val filterFlow = Flow[Int]
  .filter(value => value > 50)
  .take(2)
Source(0 to 10)
  .via(increase)
  .via(filterFlow)
  .runForeach(value => println(s"Item emitted:$value"))
  .futureValue

请注意,.to(Sink.foreach{...}).run()不会公开需要同步的Future[Done]。你的代码需要改成.toMat(Sink.foreach{...})(Keep.right).run(),可以缩写为.runForeach(...)

因为你说的是以下内容:

对于数字 1..10 将它们乘以 10,但只产生 2 个前元素,然后保留所有大于 50 的元素,然后打印它们。

此外,您的测试不会等待 RunnableFlow 的完成,这通常意味着您的程序将在流有机会运行之前退出(Akka 流异步运行)。

请注意,对于您的示例,没有理由使用 GraphDSL,您的代码与以下内容相同:

Source(1 to 10).map(_ * 10).take(2).filter(_ > 50).runForeach(println)

但是由于它并没有真正做任何"有意义的异步"的事情,我认为你会更好:

(1 to 10).map(_ * 10).take(2).filter(_ > 50).foreach(println)

但话又说回来,以代码的当前状态,它等效于以下表达式:

()

最新更新