fs2流在延迟时不中断



此处不中断fs2流:

import cats.effect.*
import fs2.Stream
import scala.concurrent.duration.*
import cats.effect.unsafe.implicits.global
val test = for {
cancel <- Deferred[IO, Either[Throwable, Unit]]
_ <- (IO.unit.delayBy(5.seconds).map { _ => println("Completing deferred"); cancel.complete(Right(())) }).start
_ <- Stream.awakeEvery[IO](1.second).map(x => println(x)).interruptWhen(cancel).compile.drain
} yield ()
test.unsafeRunSync()

但是如果我们交换线路和光纤,它会中断:

import cats.effect.*
import fs2.Stream
import scala.concurrent.duration.*
import cats.effect.unsafe.implicits.global
val test = for {
cancel <- Deferred[IO, Either[Throwable, Unit]]
_ <- Stream.awakeEvery[IO](1.second).map(x => println(x)).interruptWhen(cancel).compile.drain.start
_ <- (IO.unit.delayBy(5.seconds).map { _ => println("Completing deferred"); cancel.complete(Right(())) })
} yield ()
test.unsafeRunSync()

问题是您没有正确使用IO
记住IO[A]只是一个程序描述,一个值。它自己不做任何事。

当你调用cancel.complete时,你只是在创建一个新程序,它不做任何事情,除非你将它与其他程序组合在一起。你用map方法合成它;它并没有真正地组合程序,所以你的cancel丢失了,start只会创建一个纤维,它会创建这样一个程序并丢弃它。

在第二个示例中,由于for将所有内容转换为flatMap,因此您最终意外地编写了程序。

快速的解决方案是在第一个示例中使用flatMap而不是map。但是,恕我直言,更好的解决方案是使用像这样的合适组合子:

val run: IO[Unit] =
Deferred[IO, Either[Throwable, Unit]].flatMap { cancelToken =>
val cancel =
IO.sleep(5.seconds) >>
IO.println("Completing deferred") >>
cancelToken.complete(Right(()))
val program =
IO.println("Starting stream") >>
Stream
.awakeEvery[IO](1.second)
.foreach(x => IO.println(x))
.interruptWhen(cancelToken)
.compile
.drain >>
IO.println("Stream finished")
cancel.background.surround(program)
}

你可以看到代码在这里运行

最新更新