我正试图用SignalRef中断fs2流。我用以下内容设置并运行流。当switch
包含false
时,流应运行,当switch
包含true
时,流中断
import cats.effect.IO
import fs2.Stream
import fs2.concurrent.SignallingRef
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt
implicit val contextShift = IO.contextShift(ExecutionContext.global)
implicit val timer = IO.timer(ExecutionContext.global)
val switch: IO[SignallingRef[IO, Boolean]] = SignallingRef[IO, Boolean](false)
val program: Stream[IO, Unit] = {
val program: Stream[IO, Unit] =
Stream
.repeatEval(IO{
println(java.time.LocalTime.now)
println(switch.map(_.get).unsafeRunSync.unsafeRunSync)
})
.metered(1.second)
program
.interruptWhen(Stream.repeatEval(switch.map(_.get).unsafeRunSync))
}
program.compile.drain.unsafeRunAsync(() => _)
然后我尝试用中断流
switch.map(_.set(true).unsafeRunSync)
然而,这股洪流仍在继续。在stdout中,我看到
15:58:33.048504
false
15:58:34.048760
false
15:58:35.049063
false
15:58:36.049356
false
15:58:37.049615
false
所以很明显,它并没有转变为真的?
您的代码有几个问题。
首先,请检查switch
:的签名
val switch: IO[SignallingRef[IO, Boolean]] = SignallingRef[IO, Boolean](false)
类型的SignallingRef
被封装到IO
中。这意味着新SignallingRef
的创建被暂停,直到IO
monad被评估(通过IO
程序流隐式地或通过调用unsafeRunXXX
显式地(。所以这个值更合适的名称可能是createSwitch
。
当您每次创建默认值为false
的SignallingRef
的新实例时,实际上都使用switch.map(_.get).unsafeRunSync
,因此它永远不会被求值为true。
经验法则是,在完成IO/Stream程序的组装之前,不应该(几乎(调用unsafeRunXXX
方法,然后应该运行一次这种方法。
正确的方法是创建一次switch
进行理解,然后将其作为自变量传递给program
。
我对你的代码进行了一点重构,以完成我认为你打算做的事情,还添加了一些澄清注释。
import cats.effect.IO
import fs2.Stream
import fs2.concurrent.SignallingRef
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt
implicit val contextShift = IO.contextShift(ExecutionContext.global)
implicit val timer = IO.timer(ExecutionContext.global)
//I changed name to createSwitch, which I think reflect reality more
val createSwitch: IO[SignallingRef[IO, Boolean]] = SignallingRef[IO, Boolean](false)
val program: Stream[IO, Unit] = {
//I pass here switch as method's param
def program(switch: SignallingRef[IO, Boolean]): Stream[IO, Unit] =
Stream
.repeatEval {
for { //I used for-comprehension to split IO into 3 statements
switchValue <- switch.get //here I get value of switch
_ <- IO(println(java.time.LocalTime.now)) //I split println into 2 separate statements
_ <- IO(println(switchValue)) //because it's not a good practive to run 2 effect in single IO
} yield ()
}
.metered(1.second)
for {
switch <- Stream.eval(createSwitch)
//here I create effect to set switch to true after 10 seconds and then use start to run it
//separate fiber in background. If I didn't do that it would just wait 10 sec and only then run program
_ <- Stream.eval(switch.set(true).delayBy(10.seconds).start)
_ <- program(switch).interruptWhen(switch)
} yield ()
}
program.compile.drain.unsafeRunSync()
就我个人而言,我使用自己的终止开关来完成以下操作:
final case class KillSwitch[F[_]](stream: Stream[F, Unit], switch: F[Unit])
object KillSwitch {
def apply[F[_]: Sync]: F[KillSwitch[F]] =
Ref.of(true).map(switch => KillSwitch(Stream.repeatEval(switch.get).takeWhile(identity).void, switch.set(false)))
}
我使用它或多或少像:
for {
KillSwitch(kfStream, switch) <- KillSwitch[F]
streamFiber <- yourStream.zip(kfStream).map(_._1).compile.drain.start
// after a while
_ <- switch // closes kfStream
result <- streamFiber.join
} yield result
(假设这是一个伪代码来展示这个想法(。