我有F#代码,如下所示:
module O = Control.Observable
//...
use ss = serve' 4000
|> O.subscribe
(fun c -> use cs = RSS.items
|> O.subscribe (bytes >> c.SendAll) |> ignore)
其中
serve' : int -> IObservable<Socket>
c : Socket
RSS.items : IObservable<XElement>
bytes : XElement -> byte []
c.SendAll : byte [] -> unit
- 在
c.SendAll
失败之前保留cs
最常用的方法是什么 - 是否存在或可能定义
Observable.subscribeUntilError(action)
,其中如果action
失败,则处理订阅;否则只要IObservable
一直推,action
就运行
我想出了这个:
let inline Δ<'a> = Unchecked.defaultof<'a>
let inline LOG x = printf "%A" x
module O = Observable
let d = ref (Δ:IDisposable)
let b x = try a x with e -> LOG e; let s = !d in if s <> Δ then s.Dispose()
d := o |> O.subscribe b
{
new IDisposable with
member x.Dispose() = let s = !d in if s <> Δ then d := Δ; s.Dispose()
}
要演示差异,请尝试main
!
使用subscribe
:
use s = new Subject<int>()
use x = s |> O.subscribe (fun _ -> raise <| Exception ())
use y = s |> O.subscribe (printf "%i")
s.OnNext 20
应用程序崩溃:
Unhandled Exception: System.Exception: Exception of type 'System.Exception' was thrown.
at Microsoft.FSharp.Core.Operators.Raise[T](Exception exn)
at Program.System.x@604-5.Invoke(Int32 _arg1) in C:EnioxEniox.News.GoogleEniox.News.GoogleProgram.fs:line 60
at Microsoft.FSharp.Control.CommonExtensions.SubscribeToObservable@1915.System-IObserver`1-OnNext(T value)
at System.Reactive.Observer`1.OnNext(T value)
at System.Reactive.Subjects.Subject`1.OnNext(T value)
at Program.System.main(String[] args) in C:EnioxEniox.News.GoogleEniox.News.GoogleProgram.fs:line 606
现在使用subscribeUE
:
use s = new Subject<int>()
use x = s |> O.subscribeUE (fun _ -> raise <| Exception ())
use y = s |> O.subscribe (printf "%i")
s.OnNext 20
愉快地处理了订阅x
,应用程序继续运行,没有出现任何问题,并正常退出!使用LOG = ignore
:输出
20
我很想知道RX 2.0中是否真的存在类似的功能,因为我发现这个组合子太有用了,不能忽略。