超时后停止FS2流

  • 本文关键字:FS2 超时 scala fs2
  • 更新时间 :
  • 英文 :


我想使用类似于take(n: Int)的函数,但在时间维度: consume(period: Duration。因此,如果发生超时,我希望流终止。我知道我可以将流汇编为IO[List[T]]之类的东西并取消它,但是随后我将失去结果。实际上,我想将无尽的流转换为有限的流并保留结果。

更多关于问题的范围。我从消息传递经纪人那里有无休止的事件流,但是我也有旋转的凭据可以连接到经纪人。因此,我想要的是在一段时间内消耗事件流,然后停止,获得新的凭据,再次连接到创建新流的经纪人,并将两个流置于一个流中。

有一种方法,可以做到这一点:

/**
    * Interrupts this stream after the specified duration has passed.
    */
  def interruptAfter[F2[x] >: F[x]: Concurrent: Timer](duration: FiniteDuration): Stream[F2, O]

您需要这样的东西

import scala.util.Random
import scala.concurrent.ExecutionContext
import fs2._
import fs2.concurrent.SignallingRef
implicit val ex = ExecutionContext.global
implicit val t: Timer[IO] = IO.timer(ex)
implicit val cs: ContextShift[IO] = IO.contextShift(ex)
val effect: IO[Long] = IO.sleep(1.second).flatMap(_ => IO{
  val next = Random.nextLong()
  println("NEXT: " + next)
  next
})
val signal = SignallingRef[IO, Boolean](false).unsafeRunSync()
val timer = Stream.sleep(10.seconds).flatMap(_ => 
  Stream.eval(signal.set(true)).flatMap(_ => 
    Stream.emit(println("Finish")).covary[IO]))
val stream = timer concurrently 
Stream.repeatEval(effect).interruptWhen(signal)
stream.compile.drain.unsafeRunSync()

另外,如果要保存发布数据的结果,则需要将FS2的一些无界队列通过queue.stream.stream.stream

将已发布的数据转换为您的结果

最新更新