我有一个由库提供的IObservable
,它侦听来自外部服务的事件:
let startObservable () : IObservable<'a> = failwith "Given"
对于每个收到的事件,我想执行一个返回Async
的操作:
let action (item: 'a) : Async<unit> = failwith "Given"
我正在尝试在以下行中实现处理器
let processor () : Async<unit> =
startObservable()
|> Observable.mapAsync action
|> Async.AwaitObservable
我已经编造了mapAsync
和AwaitObservable
:理想情况下,它们将由某个图书馆提供,到目前为止我还没有找到。
额外要求:
操作应按顺序执行,以便在处理前一个事件时缓冲后续事件。
如果操作引发错误,我希望我的处理器完成。否则,它将永远不会完成。
应尊重通过
Async.Start
传递的取消令牌。
关于我应该使用的库的任何提示?
由于您要将基于推送的模型(IObservable<>
(转换为基于拉取的模型(Async<>
(,因此您需要排队以缓冲来自可观察的数据。如果队列是大小限制的 - 哪个 tbh。应该是为了使整个管道安全,以免内存泛滥 - 那么还需要缓冲区溢出的策略。
- 一种方法是实现
MailboxProcessor<>
和自定义可观察量,这会将数据发布到它。由于 MP 是本机 F# 执行组件实现,因此它能够使用队列进行有序处理以缓冲峰值。 -
另一种选择是使用
FSharp.Control.AsyncSeq
(特别是AsyncSeq.ofObservableBuffered函数(,它将可观察变为基于拉取的异步枚举 - 在它下面使用邮箱处理器从第一点:startObservable() |> AsyncSeq.ofObservableBuffered |> AsyncSeq.iterAsync action
Hopac可以很容易地与IObservables
互操作 您可以使用Job.toAsync
将HopacJobs
转换为Async
open System
open Hopac
let startObservable () : IObservable<'a> = failwith "Given"
let action (item: 'a) : Job<unit> = failwith "Given"
let processor () : Job<unit> =
startObservable()
|> Stream.ofObservable
|> Stream.mapJob action
|> Stream.iter