如何更改Rx Builder实现以修复堆栈溢出异常



我正在尝试开发一个Rx Builder,以便在F#计算表达式语法中使用反应扩展。我该如何修理它,使它不会炸掉烟囱?就像下面的Seq示例一样。有没有计划将RxBuilder的实现作为响应式扩展的一部分或作为.NET Framework未来版本的一部分?

open System
open System.Linq
open System.Reactive.Linq
type rxBuilder() =    
    member this.Delay f = Observable.Defer f
    member this.Combine (xs: IObservable<_>, ys : IObservable<_>) = 
        Observable.merge xs ys      
    member this.Yield x = Observable.Return x
    member this.YieldFrom (xs:IObservable<_>) = xs
let rx = rxBuilder()
let rec f x = seq { yield x 
                    yield! f (x + 1) }
let rec g x = rx { yield x 
                    yield! g (x + 1) }

//do f 5 |> Seq.iter (printfn "%A")
do g 5 |> Observable.subscribe (printfn "%A") |> ignore
do System.Console.ReadLine() |> ignore

一个简短的答案是,Rx Framework不支持使用这样的递归模式生成可观测值,因此这不容易做到。用于F#序列的Combine运算需要一些可观察器无法提供的特殊处理。Rx框架可能期望您使用Observable.Generate生成可观测值,然后使用LINQ查询/F#计算生成器来处理它们。

不管怎样,这里有一些想法-

首先,您需要将Observable.merge替换为Observable.Concat。第一个并行运行两个可观测值,而第二个首先从第一个可观测结果中产生所有值,然后从第二个可观测数据中产生值。更改后,代码段将在堆栈溢出之前至少打印~800个数字。

堆栈溢出的原因是Concat创建了一个调用Concat的可观察对象,以创建另一个调用Concat的可观测对象等。解决此问题的一种方法是添加一些同步。如果您使用的是Windows窗体,那么您可以修改Delay,以便它在GUI线程上调度可观察对象(这将丢弃当前堆栈)。这是一个草图:

type RxBuilder() =   
  member this.Delay f = 
      let sync = System.Threading.SynchronizationContext.Current 
      let res = Observable.Defer f
      { new IObservable<_> with
          member x.Subscribe(a) = 
            sync.Post( (fun _ -> res.Subscribe(a) |> ignore), null)
            // Note: This is wrong, but we cannot easily get the IDisposable here
            null }
  member this.Combine (xs, ys) = Observable.Concat(xs, ys)
  member this.Yield x = Observable.Return x
  member this.YieldFrom (xs:IObservable<_>) = xs

为了正确地实现这一点,您必须编写自己的Concat方法,这是非常复杂的。想法是:

  • Concat返回一些特殊类型,例如IConcatenatedObservable
  • 当递归调用该方法时,您将创建一个相互引用的IConcatenatedObservable
  • Concat方法将查找此链,当例如有三个对象时,它将丢弃中间的一个(以始终保持长度最多为2的链)

这对于StackOverflow的回答来说有点太复杂了,但它可能对Rx团队来说是一个有用的反馈。

请注意,Rx v2.0中已经修复了这一问题(如前所述),更广泛地说,它适用于所有排序运算符(Concat、Catch、OnErrorResumeText)以及命令运算符(If、While等)。

基本上,您可以将这类运算符视为订阅终端观察器消息中的另一个序列(例如,Concat在接收到当前序列的OnCompleted消息后订阅下一个序列),这就是尾部递归类比的作用所在

在Rx v2.0中,所有的尾部递归订阅都被扁平化为一个类似队列的数据结构,以便一次处理一个,并与下游观察者对话。这避免了观察者在连续序列订阅中相互交谈的无限增长。

这在Rx 2.0 Beta中已经修复。这是一个测试。

这样的东西怎么样?

type rxBuilder() =    
   member this.Delay (f : unit -> 'a IObservable) = 
               { new IObservable<_> with
                    member this.Subscribe obv = (f()).Subscribe obv }
   member this.Combine (xs:'a IObservable, ys: 'a IObservable) =
               { new IObservable<_> with
                    member this.Subscribe obv = xs.Subscribe obv ; 
                                                ys.Subscribe obv }
   member this.Yield x = Observable.Return x
   member this.YieldFrom xs = xs
let rx = rxBuilder()
let rec f x = rx { yield x 
                   yield! f (x + 1) }
do f 5 |> Observable.subscribe (fun x -> Console.WriteLine x) |> ignore
do System.Console.ReadLine() |> ignore

http://rxbuilder.codeplex.com/(为试验RxBuilder而创建)

xs一次性用品没有接线。我一试着用电线把一次性用品捆起来,它就又开始炸了。

如果我们从这个计算表达式(又名Monad)中去掉语法糖,我们将得到:

let rec g x = Observable.Defer (fun () -> Observable.merge(Observable.Return x, g (x + 1) )

或者在C#中:

public static IObservable<int> g(int x)
{
    return Observable.Defer<int>(() =>
    {
      return Observable.Merge(Observable.Return(x), g(x + 1));                    
    });
}

这绝对不是尾部递归。我认为如果你能让它尾递归,那么它可能会解决你的问题

相关内容

  • 没有找到相关文章

最新更新