我正在尝试在2个并发流之间共享Ref[F, A]
。以下是实际场景的简化示例。
class Container[F[_]](implicit F: Sync[F]) {
private val counter = Ref[F].of(0)
def incrementBy2 = counter.flatMap(c => c.update(i => i + 2))
def printCounter = counter.flatMap(c => c.get.flatMap(i => F.delay(println(i))))
}
main函数
object MyApp extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
val s = for {
container <- Ref[IO].of(new Container[IO]())
} yield {
val incrementBy2 = Stream.repeatEval(
container.get
.flatTap(c => c.incrementBy2)
.flatMap(c => container.update(_ => c))
)
.metered(2.second)
.interruptScope
val printStream = Stream.repeatEval(
container.get
.flatMap(_.printCounter)
)
.metered(1.seconds)
incrementBy2.concurrently(printStream)
}
Stream.eval(s)
.flatten
.compile
.drain
.as(ExitCode.Success)
}
}
incrementBy2
所做的更新在printStream
中不可见。我该如何解决这个问题?如果能帮助我理解这段代码中的错误,我将不胜感激。
自类定义以来,您的代码已被破坏,您甚至没有更新相同的Ref
请记住,IO
的重点是对计算的描述,因此Ref[F].of(0)
返回的程序在求值时将创建一个新的Ref
,调用多个flatMaps
将导致创建多个Refs
。
而且,你没有以正确的方式做无标签的final(有些人可能会认为即使是正确的方式也不值得:https://alexn.org/blog/2022/04/18/scala-oop-design-sample/)
这是我将如何写你的代码:
trait Counter {
def incrementBy2: IO[Unit]
def printCounter: IO[Unit]
}
object Counter {
val inMemory: IO[Counter] =
IO.ref(0).map { ref =>
new Counter {
override final val incrementBy2: IO[Unit] =
ref.update(c => c + 2)
override final val printCounter: IO[Unit] =
ref.get.flatMap(IO.println)
}
}
}
object Program {
def run(counter: Counter): Stream[IO, Unit] =
Stream
.repeatEval(counter.printCounter)
.metered(1.second)
.concurrently(
Stream.repeatEval(counter.incrementBy2).metered(2.seconds)
).interruptAfter(10.seconds)
}
object Main extends IOApp.Simple {
override final val run: IO[Unit] =
Stream
.eval(Counter.inMemory)
.flatMap(Program.run)
.compile
.drain
}
PS:我实际上不会有
中进行打印printCounter
,但getCounter
并在Program
你可以看到代码在这里运行