正在为时移序列恢复1分钟前的数据



此类累积值+在当前时刻知道当前和与1分钟前的和之间的差。它的客户端以这样的方式使用它:为每个传入的数据块添加新的值并获得差异。现在,恢复它的状态出现了问题。假设应用程序被回收,泵中前一分钟的数据丢失,回收后的第一分钟Change将等于0,所以我必须等待一分钟才能计算出差异。如何修复?

public class ChangeEstimator
{
    private int sum;
    private Subject<int> sumPump;
    private IConnectableObservable<int> hotSumPump;
    public int Sum
    {
        get
        {
            return sum;
        }
        private set
        {
            sum = value;
            sumPump.OnNext(value);
        }
    }
    public int Change { get; private set; }
    public void Start()
    {
        sumPump = new Subject<int>();
        hotSumPump = sumPump.Publish();
        var changePeriod = TimeSpan.FromMinutes(1);
        hotSumPump.Delay(changePeriod)
                  .Subscribe(value =>
                  {
                      Change = Sum - value;
                  });
        hotSumPump.Connect();
    }
    public void AddNewValue(int newValue)
    {
        Sum += newValue;
    }
}

更新

在下面的代码中,您可以看到解释。客户端订阅事务流,并在每次新事务中更新估计器。客户端还公开了IObservable快照源,它将数据快照推送给侦听器,侦听器可以是UI或数据库。问题是,当回收发生时,UI显示的不是真正的Change,而是0。如果这个问题对于Stackoverflow来说太具体了,请原谅我。有人建议我使用RabbitMQ来保持更改的持久性。你认为它能解决这个问题吗?

public class Transaction
{
    public int Price { get; set; }
}
public class AlgorithmResult
{
    public int Change { get; set; }
}
public interface ITransactionProvider
{
    IObservable<Transaction> TransactionStream { get; }
}
public class Client
{
    private ChangeEstimator estimator = new ChangeEstimator();
    private ITransactionProvider transactionProvider;
    public Client(ITransactionProvider transactionProvider)
    {
        this.transactionProvider = transactionProvider;
    }
    public void Init()
    {
        transactionProvider.TransactionStream.Subscribe(t =>
        {
            estimator.AddNewValue(t.Price);
        });
    }
    public IObservable<AlgorithmResult> CreateSnaphotsTimedSource(int periodSeconds)
    {
        return Observable
            .Interval(TimeSpan.FromSeconds(periodSeconds))
            .Select(_ =>
            {
                AlgorithmResult snapshot;
                snapshot = new AlgorithmResult
                {
                    Change = estimator.Change
                };
                return snapshot;
            })
            .Where(snapshot => snapshot != null);
    }
}

您的应用程序被重新启动,并且没有以前的内存(双关语)。没有任何Rx技巧(在此应用程序中)可以帮助您。

如前所述,您应该弄清楚业务需求,并在启动期间考虑状态初始化。为了实现队列,您可能需要考虑通过I/O源存储最新状态,或者在消息发送方和使用者之间分离应用程序逻辑。

我必须回答自己的问题,因为我从某人那里得到了答案,这在我的情况下很有效。我同意正确的答案取决于商业逻辑,我想我已经尽可能清楚地解释了。

因此,在这里,处理可能的应用程序回收的正确方法是将类ChangeEstimator放在外部进程中,并与其交换消息。我使用AMQP向估计器(RabbitMQ)发送消息。这里的关键点是,与包含其余进程的web应用程序相比,外部进程关闭/回收的风险非常小。

相关内容

  • 没有找到相关文章

最新更新