我正在尝试开发一个Rx Builder,以便在F#计算表达式语法中使用反应扩展。我该如何修理它,使它不会炸掉烟囱?就像下面的Seq示例一样。有没有计划将RxBuilder的实现作为响应式扩展的一部分或作为.NET Framework未来版本的一部分?
open System
open System.Linq
open System.Reactive.Linq
type rxBuilder() =
member this.Delay f = Observable.Defer f
member this.Combine (xs: IObservable<_>, ys : IObservable<_>) =
Observable.merge xs ys
member this.Yield x = Observable.Return x
member this.YieldFrom (xs:IObservable<_>) = xs
let rx = rxBuilder()
let rec f x = seq { yield x
yield! f (x + 1) }
let rec g x = rx { yield x
yield! g (x + 1) }
//do f 5 |> Seq.iter (printfn "%A")
do g 5 |> Observable.subscribe (printfn "%A") |> ignore
do System.Console.ReadLine() |> ignore
一个简短的答案是,Rx Framework不支持使用这样的递归模式生成可观测值,因此这不容易做到。用于F#序列的Combine
运算需要一些可观察器无法提供的特殊处理。Rx框架可能期望您使用Observable.Generate
生成可观测值,然后使用LINQ查询/F#计算生成器来处理它们。
不管怎样,这里有一些想法-
首先,您需要将Observable.merge
替换为Observable.Concat
。第一个并行运行两个可观测值,而第二个首先从第一个可观测结果中产生所有值,然后从第二个可观测数据中产生值。更改后,代码段将在堆栈溢出之前至少打印~800个数字。
堆栈溢出的原因是Concat
创建了一个调用Concat
的可观察对象,以创建另一个调用Concat
的可观测对象等。解决此问题的一种方法是添加一些同步。如果您使用的是Windows窗体,那么您可以修改Delay
,以便它在GUI线程上调度可观察对象(这将丢弃当前堆栈)。这是一个草图:
type RxBuilder() =
member this.Delay f =
let sync = System.Threading.SynchronizationContext.Current
let res = Observable.Defer f
{ new IObservable<_> with
member x.Subscribe(a) =
sync.Post( (fun _ -> res.Subscribe(a) |> ignore), null)
// Note: This is wrong, but we cannot easily get the IDisposable here
null }
member this.Combine (xs, ys) = Observable.Concat(xs, ys)
member this.Yield x = Observable.Return x
member this.YieldFrom (xs:IObservable<_>) = xs
为了正确地实现这一点,您必须编写自己的Concat
方法,这是非常复杂的。想法是:
- Concat返回一些特殊类型,例如
IConcatenatedObservable
- 当递归调用该方法时,您将创建一个相互引用的
IConcatenatedObservable
链 Concat
方法将查找此链,当例如有三个对象时,它将丢弃中间的一个(以始终保持长度最多为2的链)
这对于StackOverflow的回答来说有点太复杂了,但它可能对Rx团队来说是一个有用的反馈。
请注意,Rx v2.0中已经修复了这一问题(如前所述),更广泛地说,它适用于所有排序运算符(Concat、Catch、OnErrorResumeText)以及命令运算符(If、While等)。
基本上,您可以将这类运算符视为订阅终端观察器消息中的另一个序列(例如,Concat在接收到当前序列的OnCompleted消息后订阅下一个序列),这就是尾部递归类比的作用所在
在Rx v2.0中,所有的尾部递归订阅都被扁平化为一个类似队列的数据结构,以便一次处理一个,并与下游观察者对话。这避免了观察者在连续序列订阅中相互交谈的无限增长。
这在Rx 2.0 Beta中已经修复。这是一个测试。
这样的东西怎么样?
type rxBuilder() =
member this.Delay (f : unit -> 'a IObservable) =
{ new IObservable<_> with
member this.Subscribe obv = (f()).Subscribe obv }
member this.Combine (xs:'a IObservable, ys: 'a IObservable) =
{ new IObservable<_> with
member this.Subscribe obv = xs.Subscribe obv ;
ys.Subscribe obv }
member this.Yield x = Observable.Return x
member this.YieldFrom xs = xs
let rx = rxBuilder()
let rec f x = rx { yield x
yield! f (x + 1) }
do f 5 |> Observable.subscribe (fun x -> Console.WriteLine x) |> ignore
do System.Console.ReadLine() |> ignore
http://rxbuilder.codeplex.com/(为试验RxBuilder而创建)
xs一次性用品没有接线。我一试着用电线把一次性用品捆起来,它就又开始炸了。
如果我们从这个计算表达式(又名Monad)中去掉语法糖,我们将得到:
let rec g x = Observable.Defer (fun () -> Observable.merge(Observable.Return x, g (x + 1) )
或者在C#中:
public static IObservable<int> g(int x)
{
return Observable.Defer<int>(() =>
{
return Observable.Merge(Observable.Return(x), g(x + 1));
});
}
这绝对不是尾部递归。我认为如果你能让它尾递归,那么它可能会解决你的问题