如何异步中断fs2流



我正试图用SignalRef中断fs2流。我用以下内容设置并运行流。当switch包含false时,流应运行,当switch包含true时,流中断

import cats.effect.IO
import fs2.Stream
import fs2.concurrent.SignallingRef
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt
implicit val contextShift = IO.contextShift(ExecutionContext.global)
implicit val timer = IO.timer(ExecutionContext.global)
val switch: IO[SignallingRef[IO, Boolean]] = SignallingRef[IO, Boolean](false)
val program: Stream[IO, Unit] = {
val program: Stream[IO, Unit] =
Stream
.repeatEval(IO{
println(java.time.LocalTime.now)
println(switch.map(_.get).unsafeRunSync.unsafeRunSync)
})
.metered(1.second)
program
.interruptWhen(Stream.repeatEval(switch.map(_.get).unsafeRunSync))
}

program.compile.drain.unsafeRunAsync(() => _)

然后我尝试用中断流

switch.map(_.set(true).unsafeRunSync)

然而,这股洪流仍在继续。在stdout中,我看到

15:58:33.048504
false
15:58:34.048760
false
15:58:35.049063
false
15:58:36.049356
false
15:58:37.049615
false

所以很明显,它并没有转变为真的?

您的代码有几个问题。

首先,请检查switch:的签名

val switch: IO[SignallingRef[IO, Boolean]] = SignallingRef[IO, Boolean](false)

类型的SignallingRef被封装到IO中。这意味着新SignallingRef的创建被暂停,直到IOmonad被评估(通过IO程序流隐式地或通过调用unsafeRunXXX显式地(。所以这个值更合适的名称可能是createSwitch

当您每次创建默认值为falseSignallingRef的新实例时,实际上都使用switch.map(_.get).unsafeRunSync,因此它永远不会被求值为true。

经验法则是,在完成IO/Stream程序的组装之前,不应该(几乎(调用unsafeRunXXX方法,然后应该运行一次这种方法。

正确的方法是创建一次switch进行理解,然后将其作为自变量传递给program

我对你的代码进行了一点重构,以完成我认为你打算做的事情,还添加了一些澄清注释。

import cats.effect.IO
import fs2.Stream
import fs2.concurrent.SignallingRef
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt
implicit val contextShift = IO.contextShift(ExecutionContext.global)
implicit val timer = IO.timer(ExecutionContext.global)
//I changed name to createSwitch, which I think reflect reality more
val createSwitch: IO[SignallingRef[IO, Boolean]] = SignallingRef[IO, Boolean](false)
val program: Stream[IO, Unit] = {
//I pass here switch as method's param
def program(switch: SignallingRef[IO, Boolean]): Stream[IO, Unit] =
Stream
.repeatEval {
for { //I used for-comprehension to split IO into 3 statements
switchValue <- switch.get //here I get value of switch
_ <- IO(println(java.time.LocalTime.now)) //I split println into 2 separate statements
_ <- IO(println(switchValue)) //because it's not a good practive to run 2 effect in single IO
} yield ()
}
.metered(1.second)
for {
switch <- Stream.eval(createSwitch)
//here I create effect to set switch to true after 10 seconds and then use start to run it
//separate fiber in background. If I didn't do that it would just wait 10 sec and only then run program
_ <- Stream.eval(switch.set(true).delayBy(10.seconds).start)
_ <- program(switch).interruptWhen(switch)
} yield ()
}
program.compile.drain.unsafeRunSync()

就我个人而言,我使用自己的终止开关来完成以下操作:

final case class KillSwitch[F[_]](stream: Stream[F, Unit], switch: F[Unit])
object KillSwitch {
def apply[F[_]: Sync]: F[KillSwitch[F]] =
Ref.of(true).map(switch => KillSwitch(Stream.repeatEval(switch.get).takeWhile(identity).void, switch.set(false)))
}

我使用它或多或少像:

for {
KillSwitch(kfStream, switch) <- KillSwitch[F]
streamFiber <- yourStream.zip(kfStream).map(_._1).compile.drain.start
// after a while
_ <- switch // closes kfStream
result <- streamFiber.join
} yield result

(假设这是一个伪代码来展示这个想法(。

相关内容

  • 没有找到相关文章

最新更新