FS2.流挂在两次



问题:

我想反复从某些第三方库提供的fs2.Stream中获取一些批处理,因此将客户端从fs2.Stream本身抽象出来,并在准备就绪后立即简单地给他们F[List[Int]]批处理。

尝试:我尝试使用fs2.Stream::take并运行了一些示例。

我。

implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
val r = for {
queue <- fs2.concurrent.Queue.unbounded[IO, Int]
stream = queue.dequeue
_ <- fs2.Stream.range(0, 1000).covaryAll[IO, Int].evalTap(queue.enqueue1).compile.drain
_ <- stream.take(10).compile.toList.flatTap(lst => IO(println(lst))).iterateWhile(_.nonEmpty)
} yield ()
r.unsafeRunSync()

它打印第一批List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)然后挂起。我预计从01000的所有批次都将被打印出来。

在这里保持简单一点是

第二。

implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
val r = for {
queue <- fs2.concurrent.Queue.unbounded[IO, Int]
stream = queue.dequeue
_ <- fs2.Stream.range(0, 1000).covaryAll[IO, Int].evalTap(queue.enqueue1).compile.drain
_ <- stream.take(10).compile.toList.flatTap(lst => IO(println(lst)))
_ <- stream.take(20).compile.toList.flatTap(lst => IO(println(lst)))
} yield ()
r.unsafeRunSync()

行为与完全一样。打印List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)然后挂起。

问题:

给定一个fs2.Stream[IO, Int]如何提供一个效果IO[List[Int]],该效果在评估时循环访问流提供的连续批次?

好吧,您不能有一个表示多个批次的IO[List[X]]IO将是单个批次。

你能做的最好的事情就是这样:

def processByBatches(process: List[Int] => IO[Unit]): IO[Unit]

也就是说,您的用户将为您提供要为每个批处理执行的操作,您将为他们提供一个IO,该将阻止当前光纤,直到使用该函数消耗整个流。

实现此类函数的简单方法是:

def processByBatches(process: List[Int] => IO[Unit]): IO[Unit] =
getStreamFromThirdParty
.chunkN(n = ChunkSize)
.evalMap(chunk => process(chunk.toList))
.compile
.drain

相关内容

  • 没有找到相关文章

最新更新