如何正确地阻止来自外部的阿卡流



我正在设计一个将生成CSV测试数据的小工具。我想使用Akka Streams (1.0-RC4)来实现数据流。将有一个源生成随机数,转换成CSV字符串,一些速率限制器和一个写入文件的接收器。

也应该有一个干净的方法来停止工具使用一个小的REST接口。

这就是我挣扎的地方。流启动后(RunnableFlow.run())似乎没有办法停止它。源和接收是无限的(至少在磁盘满之前:)),所以它们不会停止流。

在Source或Sink中添加控制逻辑感觉不对。也使用ActorSystem.shutdown()。停止流的好方法是什么?

好的,所以我找到了一个体面的解决方案。它就在我的鼻子底下,只是我没有看到。Source.lazyEmpty具体化为一个承诺,当它完成时将终止源和它后面的流。

剩下的问题是,如何将它包含到无限的随机数流中。我试过Zip。结果是没有随机数通过流,因为lazyEmpty从不发出值(doh)。我尝试了Merge,但流从未终止,因为Merge一直持续到所有源完成。

我写了我自己的归并。它从其中一个输入端口转发所有值,并在任何源完成时终止。

object StopperFlow {
  private class StopperMergeShape[A](_init: Init[A] = Name("StopperFlow")) extends FanInShape[A](_init) {
    val in = newInlet[A]("in")
    val stopper = newInlet[Unit]("stopper")
    override protected def construct(init: Init[A]): FanInShape[A] = new StopperMergeShape[A](init)
  }
  private class StopperMerge[In] extends FlexiMerge[In, StopperMergeShape[In]](
    new StopperMergeShape(), Attributes.name("StopperMerge")) {
    import FlexiMerge._
    override def createMergeLogic(p: PortT) = new MergeLogic[In] {
      override def initialState =
        State[In](Read(p.in)) { (ctx, input, element) =>
          ctx.emit(element)
          SameState
        }
      override def initialCompletionHandling = eagerClose
    }
  }
  def apply[In](): Flow[In, In, Promise[Unit]] = {
    val stopperSource = Source.lazyEmpty[Unit]
    Flow(stopperSource) { implicit builder =>
      stopper =>
        val stopperMerge = builder.add(new StopperMerge[In]())
        stopper ~> stopperMerge.stopper
        (stopperMerge.in, stopperMerge.out)
    }
  }    
}

流可以插入到任何流中。当实现时,它将返回一个Promise,在完成时终止流。这是我的测试。

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val startTime = System.currentTimeMillis()
def dumpToConsole(f: Float) = {
  val timeSinceStart = System.currentTimeMillis() - startTime
  System.out.println(s"[$timeSinceStart] - Random number: $f")
}
val randomSource = Source(() => Iterator.continually(Random.nextFloat()))
val consoleSink = Sink.foreach(dumpToConsole)
val flow = randomSource.viaMat(StopperFlow())(Keep.both).to(consoleSink)
val (_, promise) = flow.run()
Thread.sleep(1000)
val _ = promise.success(())
Thread.sleep(1000)

我希望这对其他人也有用。仍然让我感到困惑的是为什么没有内置的方式来终止从流外部的流

不是完全停止,而是限制。您可以使用limittake

来自Streams Cookbook的例子:

val MAX_ALLOWED_SIZE = 100
// OK. Future will fail with a `StreamLimitReachedException`
// if the number of incoming elements is larger than max
val limited: Future[Seq[String]] =
  mySource.limit(MAX_ALLOWED_SIZE).runWith(Sink.seq)
// OK. Collect up until max-th elements only, then cancel upstream
val ignoreOverflow: Future[Seq[String]] =
  mySource.take(MAX_ALLOWED_SIZE).runWith(Sink.seq)

你可以使用Akka KillSwitches来中止(失败)或关闭流。

killswitches有两种类型:

  • UniqueKillSwitch,只针对单个流。
  • SharedKillSwitch,可以一次关闭多个流。

代码示例可以在链接中找到,但这里有一个使用共享killswitch终止多个流的示例:

val countingSrc = Source(Stream.from(1)).delay(1.second)
val lastSnk = Sink.last[Int]
val sharedKillSwitch = KillSwitches.shared("my-kill-switch")
val last1 = countingSrc.via(sharedKillSwitch.flow).runWith(lastSnk)
val last2 = countingSrc.via(sharedKillSwitch.flow).runWith(lastSnk)
val error = new RuntimeException("boom!")
sharedKillSwitch.abort(error)
Await.result(last1.failed, 1.second) shouldBe error
Await.result(last2.failed, 1.second) shouldBe error

最新更新