在 F# 中混合 IObservable 和 Async<'a>



我有一个由库提供的IObservable,它侦听来自外部服务的事件:

let startObservable () : IObservable<'a> = failwith "Given"

对于每个收到的事件,我想执行一个返回Async的操作:

let action (item: 'a) : Async<unit> = failwith "Given"

我正在尝试在以下行中实现处理器

let processor () : Async<unit> =
startObservable()
|> Observable.mapAsync action
|> Async.AwaitObservable

我已经编造了mapAsyncAwaitObservable:理想情况下,它们将由某个图书馆提供,到目前为止我还没有找到。

额外要求:

  • 操作应按顺序执行,以便在处理前一个事件时缓冲后续事件。

  • 如果操作引发错误,我希望我的处理器完成。否则,它将永远不会完成。

  • 应尊重通过Async.Start传递的取消令牌。

关于我应该使用的库的任何提示?

由于您要将基于推送的模型(IObservable<>(转换为基于拉取的模型(Async<>(,因此您需要排队以缓冲来自可观察的数据。如果队列是大小限制的 - 哪个 tbh。应该是为了使整个管道安全,以免内存泛滥 - 那么还需要缓冲区溢出的策略。

  1. 一种方法是实现MailboxProcessor<>和自定义可观察量,这会将数据发布到它。由于 MP 是本机 F# 执行组件实现,因此它能够使用队列进行有序处理以缓冲峰值。
  2. 另一种选择是使用FSharp.Control.AsyncSeq(特别是AsyncSeq.ofObservableBuffered函数(,它将可观察变为基于拉取的异步枚举 - 在它下面使用邮箱处理器从第一点:

    startObservable()
    |> AsyncSeq.ofObservableBuffered
    |> AsyncSeq.iterAsync action
    

Hopac可以很容易地与IObservables互操作 您可以使用Job.toAsync将HopacJobs转换为Async

open System
open Hopac
let startObservable () : IObservable<'a> = failwith "Given"
let action (item: 'a) : Job<unit> = failwith "Given"
let processor () : Job<unit> =
startObservable()
|> Stream.ofObservable
|> Stream.mapJob action
|> Stream.iter

相关内容

  • 没有找到相关文章

最新更新