反应式扩展:连续出现 3 次记录



我有一个热门的可观察对象 -

类表记录

{
    string propertyA;
    DateTime dt;
    Dictionary<string,int> checkThreshHoldValues;
}  

我需要检查字典中存在的每个值,如果 3 个连续值超过阈值。如果是,我需要为该属性生成一个新事件。(在这里,每个 reacord 将以不同的 15 分钟出现,即 4 小时内有 1 条记录。

Class GeneratedRecord
{
  string propertyA;
  DateTime lastTimeObserved;
  string dictionaryPropertyName;
  int lastValue
}

我知道我必须将groupByTil与缓冲区一起使用,但不知何故我无法思考,如何在保持条件的情况下将字典拆分为单独的部分任何帮助将不胜感激。

示例:假设我有属性A值在{"TownA","TownB","TownC"}中假设我在字典中的属性是{"污染","温度","压力"}

比方说,我是每 15 分钟的 recv 值,所以我的数据是 -

{"TownA" ,"6/14/2015 10:00",{ {"Pollution",61},{"Temperature",48},{"Pressure",40}}}
{"TownA" ,"6/14/2015 10:15",{ {"Pollution",63},{"Temperature",63},{"Pressure",40}}}
{"TownA" ,"6/14/2015 10:30",{ {"Pollution",49},{"Temperature",64},{"Pressure",40}}}
{"TownA" ,"6/14/2015 10:45",{ {"Pollution",70},{"Temperature",65},{"Pressure",40}}}

假设 Threshhold 值为 60。因此,在上面的例子中,只有温度满足条件,因为它的值为 {63,64,65 } 。对于污染,只有连续 2 个值超过 60

问候

您应该能够使用重叠窗口执行此操作,但我无法制定语法,以便在每次收到元素时启动一个新窗口并在包含三个项目时关闭窗口。但这里有一个基于 Observable 的概念解决方案。创建和维护队列。在本例中,我使用间隔和一个简单的阈值 (>3):

Observable.Create<string>(observer =>
{
    var queue = new Queue<long>(3);

    return Observable.Interval(TimeSpan.FromSeconds(1)).Subscribe(value =>
    {
        if (queue.Count == 3)
        {
            queue.Dequeue();
        }
        queue.Enqueue(value);
        if (queue.Count == 3)
        {
            var values = queue.ToArray();
            if (values.All(v => v > 3))
            {
                observer.OnNext("All above 3. " + values[0] + " " + values[1] + " " + values[2]);
            }
        }
    });
})

相关内容

  • 没有找到相关文章

最新更新