具有StateT[IO,_,_]的FS2流,周期性转储状态



我有一个程序,它消耗了无限的数据流。在此过程中,我想记录一些度量,它们形成了一个monoid,因为它们只是简单的和和和平均值。我想定期在某个地方写下这些指标,清除它们,然后再累积它们。我基本上有:

object Foo {
type MetricsIO[A] = StateT[IO, MetricData, A]
def recordMetric(m: MetricData): MetricsIO[Unit] = {
StateT.modify(_.combine(m))
}
def sendMetrics: MetricsIO[Unit] = {
StateT.modifyF { s =>
val write: IO[Unit] = writeMetrics(s)
write.attempt.map {
case Left(_) => s
case Right(_) => Monoid[MetricData].empty
}
}
}
}

因此,大多数执行直接使用IO,而使用StateT.liftF提升。在某些情况下,我会包含一些对recordMetric的调用。在它的结尾,我有一个流:

val mainStream: Stream[MetricsIO, Bar] = ...

我想定期,比如每分钟左右,转储指标,所以我尝试了:

val scheduler: Scheduler = ...
val sendStream =
scheduler
.awakeEvery[MetricsIO](FiniteDuration(1, TimeUnit.Minutes))
.evalMap(_ => Foo.sendMetrics)
val result = mainStream.concurrently(sendStream).compile.drain

然后我做一些常见的顶级程序,用启动状态调用run,然后调用unsafeRunSync

问题是,我只看到过空的指标!我怀疑这与我的monoid隐式地向sendStream提供空度量有关,但我不太清楚为什么应该这样,也不知道如何修复它。也许有一种方法可以将这些sendMetrics调用"交错"到主流中?

编辑:这是一个最小完整可运行示例

import fs2._
import cats.implicits._
import cats.data._
import cats.effect._
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
val sec = Executors.newScheduledThreadPool(4)
implicit val ec = ExecutionContext.fromExecutorService(sec)
type F[A] = StateT[IO, List[String], A]
val slowInts = Stream.unfoldEval[F, Int, Int](1) { n =>
StateT(state => IO {
Thread.sleep(500)
val message = s"hello $n"
val newState = message :: state
val result = Some((n, n + 1))
(newState, result)
})
}
val ticks = Scheduler.fromScheduledExecutorService(sec).fixedDelay[F](FiniteDuration(1, SECONDS))
val slowIntsPeriodicallyClearedState = slowInts.either(ticks).evalMap[Int] {
case Left(n) => StateT.liftF(IO(n))
case Right(_) => StateT(state => IO {
println(state)
(List.empty, -1)
})
}

现在,如果我这样做:

slowInts.take(10).compile.drain.run(List.empty).unsafeRunSync

然后我得到了预期的结果——状态正确地累积到输出中。但如果我这样做:

slowIntsPeriodicallyClearedState.take(10).compile.drain.run(List.empty).unsafeRunSync

然后我看到一个空的列表一直打印出来。我本来希望打印出部分列表(大约2个元素(。

StateT与效果类型一起使用是不安全的,因为它在面对并发访问时是不安全。相反,考虑使用Ref(来自fs2或cats效果,具体取决于哪个版本(。

类似这样的东西:

def slowInts(ref: Ref[IO, Int]) = Stream.unfoldEval[F, Int, Int](1) { n =>
val message = s"hello $n"
ref.modify(message :: _) *> IO {
Thread.sleep(500)
val result = Some((n, n + 1))
result
}
}
val ticks = Scheduler.fromScheduledExecutorService(sec).fixedDelay[IO](FiniteDuration(1, SECONDS))
def slowIntsPeriodicallyClearedState(ref: Ref[IO, Int] = 
slowInts.either(ticks).evalMap[Int] {
case Left(n) => IO.pure(n)
case Right(_) =>
ref.modify(_ => Nil).flatMap { case Change(previous, now) => 
IO(println(now)).as(-1)
}
}

相关内容

  • 没有找到相关文章

最新更新