不完全理解Source.delay方法或akka-stream文档中存在错误



我正在阅读有关KillSwitch的akka-stream文档,他们有一个示例来说明KillSwitch.shutdown方法:

val countingSrc = Source(Stream.from(1)).delay(1.second, DelayOverflowStrategy.backpressure)
val lastSnk = Sink.last[Int]
val (killSwitch, last) = countingSrc
  .viaMat(KillSwitches.single)(Keep.right)
  .toMat(lastSnk)(Keep.both)
  .run()
doSomethingElse()
killSwitch.shutdown()
Await.result(last, 1.second) shouldBe 2

我无法理解为什么预期结果应该是 2。正如我看到这个例子时,流被置于 1 秒延迟。暂停时,将调用shutdown(),以便终止开关在延迟完成之前告诉流关闭。我不明白为什么流的前 2 个元素应该被发射并输送到接收器。

你能帮忙解释一下吗?

注意:如果我运行此示例,我会得到以下异常:

Exception in thread "main" java.util.NoSuchElementException: last of empty stream
    at akka.stream.scaladsl.Sink$.$anonfun$last$3(Sink.scala:181)

对示例代码存在误解。结果完全取决于 doSomethingElse 的运行时持续时间。只有当花费的时间太少时,你才会得到异常。要对此进行测试,您可以将其替换为 Thread.sleep(2000) 您将从Sink得到一个结果。如果您增加睡眠值,结果也会增加。

关于您在评论中的问题:

delay将元素发射时间偏移指定量。延迟精度为 10ms,以避免不必要的计时器调度周期。这就是您看到这种行为的原因(您可以在 Flow 的 Scala 文档中查看这些详细信息(。

如果您想每秒发送一条消息,请尝试throttle

.throttle(1, 1.second, 1, ThrottleMode.shaping)

最新更新