FS2:是否可以优雅地完成队列?

  • 本文关键字:队列 是否 FS2 scala fs2
  • 更新时间 :
  • 英文 :


假设我想将一些遗留的异步 API 转换为 FS2 流。 API 提供了一个包含 3 个回调的接口:下一个元素、成功、错误。 我希望 Stream 发出所有元素,然后在收到成功或错误回调时完成。

FS2 指南(https://functional-streams-for-scala.github.io/fs2/guide.html)建议在这种情况下使用fs2.Queue, 它非常适合排队,但到目前为止我看到的所有示例都预计queue.dequeue返回的流永远不会完成 - 在我的情况下,没有明显的方法来处理成功/错误回调。 我尝试使用queue.dequeue.interruptWhen(...here goes the signal...),但如果成功/错误回调在客户端从流中读取数据之前到达, 流过早终止 - 仍有未读元素。我希望消费者在完成流之前完成阅读它们。

FS2 可以做到这一点吗?使用Akka Streams,这是微不足道的 -SourceQueueWithCompletecompletefail的方法。

更新: 通过将元素包装在 Option 中并将 None 视为停止读取流的信号,以及通过使用 Promise 传播错误,我能够获得足够好的结果:

queue.dequeue
.interruptWhen(interruptingPromise.get)
.takeWhile(_.isDefined).map(_.get)

但是,我是否忽略了更自然的做这些事情的方式?

执行此操作的一种惯用方法是创建一个Queue[Option[A]]而不是Queue[A]。排队时,换入Some,您可以显式排队None以发出完成信号。在取消排队方面,执行q.dequeue.unNoneTerminate,这为您提供了一个Stream[F, A],一旦队列发出None

更新的答案:将unNoneTerminaterethrow结合起来,它会接受一个Stream[F, Either[Throwable, A]]并返回一个Stream[F, A],当它包含投掷对象时

,该错误与Stream.raiseError

。然后,您的完整堆栈将是一个Stream[F, Either[Throwable, Option[A]]],您可以通过调用.rethrow.unNoneTerminate解包到Stream[F,A]

相关内容

  • 没有找到相关文章

最新更新