将可观察数据聚合到多个桶中



我需要一个关于如何分发聚合更新的好主意…

假设我有一个IObservable的Id和一个值产生一个永无止境的消息流(5-10,000/秒)。现在我想计算很多聚合(例如sum)以固定的时间间隔分发到其他系统—假设每个聚合每10秒一次。聚合基于Tuple(字符串)的Id,但可能会落入多个聚合(聚合定义应该包含哪些Id -因此会重叠)。

将有几千个聚合定义,所以有人对如何解决这个问题有任何想法吗?

在概念上

:

public struct Update
{
    public string Id { get; }
    public int Value { get; }
}
public class Aggregate
{
    Dictionary<string, Update> latestValues = new Dictionary<string, Update>();
    public void AddUpdate(Update update)
    {
        latestValues[update.Id] = update;
    }
    public int CalculateSum()
    {
        return latestValues.Values.Select(v => v.Value).Sum();
    }
}
更新:

这个问题的目的是简化实际问题——也许我没有做得那么好——对不起。假设我有多个物联网设备产生温度,并定期报告该温度(更新流)。不同的用户可以选择查看聚合(例如:设备子集的平均值。因此,一个客户可能希望看到设备1、2和3的平均值,而另一个客户可能希望看到设备2、3和4等的平均值(聚合定义)

我想你问的是如何使用Rx创建实时读取模型。

根据我对你的问题的猜测,我认为你希望能够通过每个更新消息更新一些当前状态。在你的CalculateSum方法的情况下,你不能只是求和所有消息的Value属性,因为有些将被用来更新/覆盖一个现有的值。

所以在这个假设下,看起来GroupBy会是你的朋友。如果你先把可观察的值序列分成子序列,你就可以把这个问题分解并解决了。

input.GroupBy(i=>i.Id)

如果我们只考虑属于相同Id的单个值流,每个值的和应该是什么?

-1--1--2-

在这个简单的例子中,答案总是直接传递的值。例如

input  -1--1--2-
result -1--1--2-
然而,当我们看两个产生值的序列时,计算 就有点困难了
inputA  -1-1-2--------
inputB  --1-2-2-3-5-2-
result  -122344-5-7-4-

这里我们需要查看序列中每个值的增量是什么,并将该增量推入结果。可以这样可视化

inputA  -1-1-2--------
 delta  -1-0-1--------
inputB  --1-2-2-3-5-2-
 delta  --1-1-0-1-2-(-3)-
result  -122344-5-7-4-

要创建这种投影你可以这样写

input.Scan(new { CurrentValue = 0, Delta = 0 }, (acc, cur) => new { CurrentValue = cur, Delta = cur - acc.CurrentValue }))
    .Select(acc => acc.Delta);

把它们放在一起,代码看起来像这样:

void Main()
{
    var testScheduler = new TestScheduler();
    var input = testScheduler.CreateColdObservable<Update>(
        ReactiveTest.OnNext(010, new Update("a", 1)),       //1
        ReactiveTest.OnNext(020, new Update("b", 1)),       //2
        ReactiveTest.OnNext(030, new Update("c", 3)),       //5
        ReactiveTest.OnNext(040, new Update("a", 1)),       //5
        ReactiveTest.OnNext(050, new Update("b", 2)),       //6
        ReactiveTest.OnNext(060, new Update("a", 2)),       //7
        ReactiveTest.OnNext(070, new Update("b", 2)),       //7
        ReactiveTest.OnNext(080, new Update("b", 3)),       //8
        ReactiveTest.OnNext(090, new Update("b", 5)),       //10
        ReactiveTest.OnNext(100, new Update("b", 2))        //7
    );
    var currentSum = input.GroupBy(i => i.Id)
        .SelectMany(grp => grp.Scan(new { CurrentValue = 0, Delta = 0 }, (acc, cur) => new { CurrentValue = cur.Value, Delta = cur.Value - acc.CurrentValue }))
        .Select(acc => acc.Delta)
        .Scan((acc, cur) => acc + cur);
    var observer = testScheduler.CreateObserver<int>();
    var subscription = currentSum.Subscribe(observer);
    testScheduler.Start();
    subscription.Dispose();
    ReactiveAssert.AreElementsEqual(new[]
        {
            ReactiveTest.OnNext(010, 1),
            ReactiveTest.OnNext(020, 2),
            ReactiveTest.OnNext(030, 5),
            ReactiveTest.OnNext(040, 5),
            ReactiveTest.OnNext(050, 6),
            ReactiveTest.OnNext(060, 7),
            ReactiveTest.OnNext(070, 7),
            ReactiveTest.OnNext(080, 8),
            ReactiveTest.OnNext(090, 10),
            ReactiveTest.OnNext(100, 7)}
        ,
        observer.Messages);
}
// Define other methods and classes here
public struct Update
{
    public Update(string id, int value)
    {
        Id = id;
        Value = value;
    }
    public string Id { get; }
    public int Value { get; }
}

如果你想创建多个聚合,那么每个新的聚合就像上面的查询一样。您可以通过在序列分组后共享/发布序列来进行优化,但我首先要确保这是分析所需要的。

* CQRS/ES术语中的Readmodels

相关内容

  • 没有找到相关文章

最新更新