我有一个IObservable<double>
,它以固定的间隔给出从传感器读取的值。我想在传感器的值长时间超出界限时发出信号。
作为一个具体的例子,假设可观察到的是温度传感器。我想监测温度超过100°C的时间5秒。也就是说,从IObservable<double>
产生一个可观察的或事件,当温度超过 100°C 5 秒时,该事件会触发一次。如果任意数量的样品的温度读数高于 100°C 少于 5 秒,则没有影响。在一组样品中的第一个都高于 100°C 后 5 秒,它应该会发出信号。如果该值继续高于 100°C,则在温度降至 100°C 以下然后再次超过该温度 5 秒之前,不应重新升高信号。
对于反应式扩展来说,这似乎应该很简单,但是作为新手,我找不到任何东西。我翻遍了 http://reactivex.io,http://introtorx.com 但没有找到任何东西。我可能只是不知道要寻找的正确术语。
我想了一下,意识到我们可能想多了。
让我们从一个可观察的阈值开始:
var threshold =
source
.Select(temp => temp > 100)
.DistinctUntilChanged();
当更改为高于 100 或低于 100 时,这将生成一个值。如果它继续高于 100,或保持在 100 以下,则不会产生任何新值。
现在让我们定义:
var alarmUp =
threshold
.Throttle(TimeSpan.FromSeconds(5))
.Where(cond => cond == true);
在这里,如果条件在 5 秒内没有更改,则throttle
运算符会吐出一个值。现在它可以true
(> 100(5秒,也可以false
(<100(。 我们只对它true
感兴趣(> 100(。
在此期间,如果有任何变化,油门操作员将被重置,因此条件必须保持至少 5 秒。
当新值传入时,当您想要忽略传入的任何先前值的结果时,您应该使用.Switch()
。
下面是您需要的查询:
IObservable<Unit> query =
source
.Select(x => x > 100.0)
.DistinctUntilChanged()
.Select(x => x
? Observable.Timer(TimeSpan.FromSeconds(5.0)).Select(x => Unit.Default)
: Observable.Never<Unit>())
.Switch();
.Select(x => x >= 100.0).DistinctUntilChanged()
组合将源更改为仅在传感器在x > 100.0
和x <= 100.0
之间翻转时才触发的IObservable<bool>
- 超过 100°C 时为true
,100°C 或更低时为假。
现在我们将IObservable<bool>
变成IObservable<IObservable<Unit>>
.这是一个产生其他可观察量的可观察量。当我们收到true
时,我想返回一个在5.0
秒后触发的IObservable<Unit>
,当我们收到一个false
时,我想返回一个根本不返回值的可观察量。
这就是.Select(x => x ? Observable.Timer(TimeSpan.FromSeconds(5.0)).Select(x => Unit.Default) : Observable.Never<Unit>())
所做的。
最后,我们弹出.Switch()
,通过仅根据产生的最后一个内在IObservable<Unit>
生成值,将IObservable<IObservable<Unit>>
更改为IObservable<Unit>
。换句话说,如果传感器在5.0
秒内再次从上方100.0
翻转到下方,则它将忽略Observable.Timer(TimeSpan.FromSeconds(5.0))
的值并等待来自Observable.Never<Unit>()
的值。如果超过100.0
超过5.0
则Observable.Timer(TimeSpan.FromSeconds(5.0))
会触发,您将获得从查询生成的Unit
。
这完全按照您想要的方式运行。
下面是查询的一个稍微简单的版本:
IObservable<Unit> query =
source
.Select(x => x > 100.0)
.DistinctUntilChanged()
.Select(x => x
? Observable.Timer(TimeSpan.FromSeconds(5.0))
: Observable.Never<long>())
.Switch()
.Select(x => Unit.Default);
应该可以使用内置运算符(函数样式(来做到这一点,但是通过实现具有命令式逻辑的自定义运算符来做到这一点更为严格。
public static IObservable<Unit> Alarm<T>(this IObservable<T> source,
T threshold, TimeSpan delay, IComparer<T> comparer = null)
{
comparer = comparer ?? Comparer<T>.Default;
return Observable.Create<Unit>(o =>
{
Stopwatch stopwatch = new Stopwatch();
int alarmState = 0; // 0: OK, 1: above threshold, 2: signal transmitted
return source.Subscribe(x =>
{
if (comparer.Compare(x, threshold) >= 0)
{
if (alarmState == 0)
{
alarmState = 1;
stopwatch.Restart();
}
else if (alarmState == 1 && stopwatch.Elapsed >= delay)
{
alarmState = 2;
o.OnNext(Unit.Default);
}
}
else
{
alarmState = 0;
}
}, o.OnError, o.OnCompleted);
});
}