如果可观察量的订阅者需要触发 OnNext() 怎么办?



我有一个跟踪状态的可观察量,有点像状态机。

我有一个可观察量的订阅者,它可以检测事物何时处于特定状态,并且它需要将事物推入新状态,从而有效地在可观察量上调用 OnNext()!

这违反了一次只有一个 OnNext() 在飞行中的 Rx 原则! 有什么好办法吗?就像安排 OnNext() 在当前 OnNext() 完成传播后直接发生一样?

在控制系统理论中,这种类型的系统是一个具有反馈的控制回路。它抵制传统的分析,因为系统的输入取决于输出 - 这可能是一个连续函数。

可以对可观察量(本质上是一个离散的值流)做同样的事情 - 即以声明方式表达控制系统。但是,C# 语义使实现不是很优雅。(F# 可以选择递归绑定)。

首先,我们需要定义一个用于编写反馈循环的闭包。

    public delegate IObservable<T> Feedback<T>(IObservable<T> feedback, out IObservable<T> output);
    public static IObservable<T> FeedbackSystem<T>(Feedback<T> closure)
    {
        IObservable<T> source = Observable.Empty<T>(), output;
        source = closure(Observable.Defer(() => source), out output);
        return output;
    }

使用上述方法,下面是速度调节器的示例实现,该调节器加速到100,并保持低于它的速度,即使速度中引入了随机误差。

    var system =
    FeedbackSystem((IObservable<double> acceleration, out IObservable<double> velocity) =>
    {
        //Time axis: moves forward every 0.1s
        double t = 0.1; var timeaxis = Observable.Interval(TimeSpan.FromSeconds(t));
        velocity = acceleration.Sample(timeaxis)                //move in time
                               .Scan((u, a) => u + a * t)   //u' = u + at
                               .Select(u => u + new Random().Next(10))  //some variations in speed
                               .Publish().RefCount();                   //avoid recalculation
        //negative feedback
        var feedback = velocity.Select(u => 0.5 * (100 - u));
        return feedback.Select(a => Math.Min(a, 15.0))  //new limited acceleration
                       .StartWith(0);                   //initial value          
    });
    system.Subscribe(Console.WriteLine);

通过以相同的方式使输入依赖于输出,状态机的情况是可能的。

顺便说一句,我会说你可能"想做别的事情"——也就是说,在可观察对象中有一个注入点,比如包含类等有一个方法PushState(...)

无论如何,你可以保留"我会告诉你下一步是什么"语义,方法是使用Subject<T>支持IObservable,或者在创建过程中指定注入路由Observable.Create

(尽管Subject<T>被很多人瞧不起,因为它是"不纯洁的")

相关内容

  • 没有找到相关文章

最新更新