此类累积值+在当前时刻知道当前和与1分钟前的和之间的差。它的客户端以这样的方式使用它:为每个传入的数据块添加新的值并获得差异。现在,恢复它的状态出现了问题。假设应用程序被回收,泵中前一分钟的数据丢失,回收后的第一分钟Change
将等于0
,所以我必须等待一分钟才能计算出差异。如何修复?
public class ChangeEstimator
{
private int sum;
private Subject<int> sumPump;
private IConnectableObservable<int> hotSumPump;
public int Sum
{
get
{
return sum;
}
private set
{
sum = value;
sumPump.OnNext(value);
}
}
public int Change { get; private set; }
public void Start()
{
sumPump = new Subject<int>();
hotSumPump = sumPump.Publish();
var changePeriod = TimeSpan.FromMinutes(1);
hotSumPump.Delay(changePeriod)
.Subscribe(value =>
{
Change = Sum - value;
});
hotSumPump.Connect();
}
public void AddNewValue(int newValue)
{
Sum += newValue;
}
}
更新
在下面的代码中,您可以看到解释。客户端订阅事务流,并在每次新事务中更新估计器。客户端还公开了IObservable
快照源,它将数据快照推送给侦听器,侦听器可以是UI或数据库。问题是,当回收发生时,UI显示的不是真正的Change,而是0。如果这个问题对于Stackoverflow来说太具体了,请原谅我。有人建议我使用RabbitMQ来保持更改的持久性。你认为它能解决这个问题吗?
public class Transaction
{
public int Price { get; set; }
}
public class AlgorithmResult
{
public int Change { get; set; }
}
public interface ITransactionProvider
{
IObservable<Transaction> TransactionStream { get; }
}
public class Client
{
private ChangeEstimator estimator = new ChangeEstimator();
private ITransactionProvider transactionProvider;
public Client(ITransactionProvider transactionProvider)
{
this.transactionProvider = transactionProvider;
}
public void Init()
{
transactionProvider.TransactionStream.Subscribe(t =>
{
estimator.AddNewValue(t.Price);
});
}
public IObservable<AlgorithmResult> CreateSnaphotsTimedSource(int periodSeconds)
{
return Observable
.Interval(TimeSpan.FromSeconds(periodSeconds))
.Select(_ =>
{
AlgorithmResult snapshot;
snapshot = new AlgorithmResult
{
Change = estimator.Change
};
return snapshot;
})
.Where(snapshot => snapshot != null);
}
}
您的应用程序被重新启动,并且没有以前的内存(双关语)。没有任何Rx技巧(在此应用程序中)可以帮助您。
如前所述,您应该弄清楚业务需求,并在启动期间考虑状态初始化。为了实现队列,您可能需要考虑通过I/O源存储最新状态,或者在消息发送方和使用者之间分离应用程序逻辑。
我必须回答自己的问题,因为我从某人那里得到了答案,这在我的情况下很有效。我同意正确的答案取决于商业逻辑,我想我已经尽可能清楚地解释了。
因此,在这里,处理可能的应用程序回收的正确方法是将类ChangeEstimator
放在外部进程中,并与其交换消息。我使用AMQP向估计器(RabbitMQ)发送消息。这里的关键点是,与包含其余进程的web应用程序相比,外部进程关闭/回收的风险非常小。