参考更新和使用猫效应的光纤触发器



问题: 我试图解决一个问题,我需要每x分钟调度一次,我需要更新缓存,并且可以并发获取

尝试过的解决方案:

  1. 使用具有Cats效果的TrieMap和ScheduledThreadPool执行器:

我实际上是从使用TrieMap开始的,因为它提供了线程安全,并使用调度的线程池来调度更新

import cats.Applicative.ops.toAllApplicativeOps
import cats.effect.concurrent.Ref
import cats.effect.{ExitCode, IO, IOApp}
import java.util.concurrent.{Executors, ScheduledExecutorService}
import scala.collection.concurrent.TrieMap
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.Random
object ExploreTrieMap extends IOApp {
def callForEvery[A](f: => Unit, d: FiniteDuration)
(implicit sc: ScheduledExecutorService): IO[Unit] = {
IO.cancelable {
cb =>
val r = new Runnable {
override def run(): Unit = cb(Right(f))
}
val scFut = sc.scheduleAtFixedRate(r, 0, d.length, d.unit)
IO(scFut.cancel(false)).void
}
}
val map = TrieMap.empty[String, String]
override def run(args: List[String]): IO[ExitCode] = {
implicit val scheduler: ScheduledExecutorService = Executors.newScheduledThreadPool(1)
for {
_ <- callForEvery(println(map.get("token")), 1 second)
_ <- callForEvery(println(map.put("token", Random.nextString(10))), 3 second)
} yield ExitCode.Success
}
}
  1. 使用Ref和Cats效应光纤:

然后创建了一个纯猫效果的解决方案。

下面的代码会导致StackOverflow错误吗?

import cats.effect.concurrent.Ref
import cats.effect.{ContextShift, ExitCode, Fiber, IO, IOApp}
import scala.concurrent.Future
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.Random
object ExploreCatFiber extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
for {
ref <- Ref.of[IO, String]("")
s <- scheduleAndPopulate(ref, 1 minute)
r <- keepPollingUsingFiber(ref)
_ <- s.join
_ <- r.join
} yield ExitCode.Success
}
def populate(): Future[String] = Future.successful(Random.nextString(10))
val futPop = IO.fromFuture(IO(populate()))
def scheduleAndPopulate(r: Ref[IO, String], duration: FiniteDuration)(implicit cs: ContextShift[IO]): IO[Fiber[IO, Unit]] = {
(for {
_ <- IO(println("Scheduled For Populating Ref"))
res <- futPop
_ <- r.set(res)
_ <- IO.sleep(duration)
rS <- scheduleAndPopulate(r, duration)(cs)
_ <- rS.join
} yield ()).start(cs)
}

def keepPollingUsingFiber(r: Ref[IO, String])(implicit cs: ContextShift[IO]): IO[Fiber[IO, Unit]] = {
(for {
res <- r.get
_ <- IO(println(res))
_ <- IO.sleep(1 second)
w <- keepPollingUsingFiber(r)(cs)
_ <- w.join
} yield ()).start(cs)
}
}

我正在尝试更新一个Ref,并像另一个光纤正在更新的并发缓存一样使用Ref。我正在使用递归触发光纤创建。我知道纤维可以用于堆叠安全操作。在这种情况下,我加入了创建的旧纤维。所以想了解一下下面的代码是安全的。

更新(下面提供的答案中的解决方案)

第三个解决方案:基于其中一个答案的输入。与其为每个递归调用分叉,不如在调用方上分叉。

import cats.effect.concurrent.Ref
import cats.effect.{ContextShift, ExitCode, Fiber, IO, IOApp}
import scala.concurrent.Future
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.Random
object ExploreCatFiberWithIO extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
for {
ref <- Ref.of[IO, String]("")
s <- scheduleAndPopulateWithIO(ref, 1 second).start
r <- keepPollingUsingIO(ref).start
_ <- s.join
_ <- r.join
} yield ExitCode.Success
}
def populate(): Future[String] = Future.successful(Random.nextString(10))
val futPop = IO.fromFuture(IO(populate()))
def scheduleAndPopulateWithIO(r: Ref[IO, String], duration: FiniteDuration)(implicit cs: ContextShift[IO]): IO[Unit] = {
for {
_ <- IO(println("Scheduled For Populating Ref"))
res <- futPop
_ <- r.set(res)
_ <- IO.sleep(duration)
_ <- scheduleAndPopulateWithIO(r, duration)(cs)
} yield ()
}
def keepPollingUsingIO(r: Ref[IO, String])(implicit cs: ContextShift[IO]): IO[Unit] = {
(for {
res <- r.get
_ <- IO(println(res))
_ <- IO.sleep(1 second)
w <- keepPollingUsingIO(r)(cs)
} yield ())
}
}

很想知道上面讨论的方法的优缺点

对于第二种方法,您可以通过不在scheduleAndPopulatekeepPollingUsingFiber中分叉Fiber来简化它。相反,保留递归调用,并在调用者中派生它们。IO是堆栈安全的,因此递归调用不会破坏堆栈。

您可以使用start来分叉每个,但parTupled可能更简单。这是parMapN的一个变体,它分叉每个效果并收集它们的结果。

(此外,在您的代码中,您不需要显式地传递隐式值,如cs,编译器将为您推断它们。)

object ExploreCatFiber extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
for {
ref <- Ref.of[IO, String]("")
_ <- (scheduleAndPopulate(ref, 1 minute), keepPollingUsingFiber(ref)).parTupled
} yield ExitCode.Success
}
def populate(): Future[String] = Future.successful(Random.nextString(10))
val futPop = IO.fromFuture(IO(populate()))
def scheduleAndPopulate(r: Ref[IO, String], duration: FiniteDuration): IO[Unit] = {
(for {
_ <- IO(println("Scheduled For Populating Ref"))
res <- futPop
_ <- r.set(res)
_ <- IO.sleep(duration)
_ <- scheduleAndPopulate(r, duration)
} yield ()
}
def keepPollingUsingFiber(r: Ref[IO, String]): IO[Unit] = {
(for {
res <- r.get
_ <- IO(println(res))
_ <- IO.sleep(1 second)
_ <- keepPollingUsingFiber(r)
} yield ()
}
}

最新更新