[这个问题属于反应式扩展(Rx)的领域]
需要在应用程序重启时继续的订阅
int nValuesBeforeOutput = 123;
myStream.Buffer(nValuesBeforeOutput).Subscribe(
i => Debug.WriteLine("Something Critical on Every 123rd Value"));
现在,我需要序列化和反序列化此订阅的状态,以便下次启动应用程序时,缓冲区计数不会从零开始,而是从应用程序退出之前缓冲区计数达到的任何内容开始。
- 在这种情况下,您如何保持 IObservable.Subscribe() 的状态并在以后加载它?
- 是否有在 Rx 中保存观察者状态的通用解决方案?
从答案到解决方案
基于 Paul Betts 方法,这是一个在我的初始测试中起作用的半通用实现
用
int nValuesBeforeOutput = 123;
var myRecordableStream = myStream.Record(serializer);
myRecordableStream.Buffer(nValuesBeforeOutput).ClearRecords(serializer).Subscribe(
i => Debug.WriteLine("Something Critical on Every 123rd Value"));
扩展方法
private static bool _alreadyRecording;
public static IObservable<T> Record<T>(this IObservable<T> input,
IRepositor repositor)
{
IObservable<T> output = input;
List<T> records = null;
if (repositor.Deserialize(ref records))
{
ISubject<T> history = new ReplaySubject<T>();
records.ForEach(history.OnNext);
output = input.Merge(history);
}
if (!_alreadyRecording)
{
_alreadyRecording = true;
input.Subscribe(i => repositor.SerializeAppend(new List<T> {i}));
}
return output;
}
public static IObservable<T> ClearRecords<T>(this IObservable<T> input,
IRepositor repositor)
{
input.Subscribe(i => repositor.Clear());
return input;
}
笔记
- 这不适用于存储依赖于生成的值之间的时间间隔的状态
- 您需要一个支持序列化 T 的序列化程序实现
- 如果您多次订阅
myRecordableStream
,则需要_alreadyRecording
-
_alreadyRecording
是一个静态布尔值,非常丑陋,如果需要并行订阅,可以防止扩展方法在多个地方使用 - 需要重新实现以供将来使用
对此没有通用的解决方案,制作一个解决方案将是不平凡™的。你能做的最接近的事情是使myStream成为某种可观察的重播(即不是序列化状态,而是序列化myStream的状态并重做工作以让你回到原来的位置)。