我有一个跟踪状态的可观察量,有点像状态机。
我有一个可观察量的订阅者,它可以检测事物何时处于特定状态,并且它需要将事物推入新状态,从而有效地在可观察量上调用 OnNext()!
这违反了一次只有一个 OnNext() 在飞行中的 Rx 原则! 有什么好办法吗?就像安排 OnNext() 在当前 OnNext() 完成传播后直接发生一样?
在控制系统理论中,这种类型的系统是一个具有反馈的控制回路。它抵制传统的分析,因为系统的输入取决于输出 - 这可能是一个连续函数。
可以对可观察量(本质上是一个离散的值流)做同样的事情 - 即以声明方式表达控制系统。但是,C# 语义使实现不是很优雅。(F# 可以选择递归绑定)。
首先,我们需要定义一个用于编写反馈循环的闭包。
public delegate IObservable<T> Feedback<T>(IObservable<T> feedback, out IObservable<T> output);
public static IObservable<T> FeedbackSystem<T>(Feedback<T> closure)
{
IObservable<T> source = Observable.Empty<T>(), output;
source = closure(Observable.Defer(() => source), out output);
return output;
}
使用上述方法,下面是速度调节器的示例实现,该调节器加速到100,并保持低于它的速度,即使速度中引入了随机误差。
var system =
FeedbackSystem((IObservable<double> acceleration, out IObservable<double> velocity) =>
{
//Time axis: moves forward every 0.1s
double t = 0.1; var timeaxis = Observable.Interval(TimeSpan.FromSeconds(t));
velocity = acceleration.Sample(timeaxis) //move in time
.Scan((u, a) => u + a * t) //u' = u + at
.Select(u => u + new Random().Next(10)) //some variations in speed
.Publish().RefCount(); //avoid recalculation
//negative feedback
var feedback = velocity.Select(u => 0.5 * (100 - u));
return feedback.Select(a => Math.Min(a, 15.0)) //new limited acceleration
.StartWith(0); //initial value
});
system.Subscribe(Console.WriteLine);
通过以相同的方式使输入依赖于输出,状态机的情况是可能的。
顺便说一句,我会说你可能"想做别的事情"——也就是说,在可观察对象中有一个注入点,比如包含类等有一个方法PushState(...)
无论如何,你可以保留"我会告诉你下一步是什么"语义,方法是使用Subject<T>
支持IObservable
,或者在创建过程中指定注入路由Observable.Create
(尽管Subject<T>
被很多人瞧不起,因为它是"不纯洁的")