我一直在使用.NET Reactive Extensions来观察日志事件。我目前正在使用一个从IObservable派生的类,该类使用ReplaySubject来存储日志,这样我就可以过滤和回放日志(例如:显示所有错误日志,或显示所有详细日志),而不会丢失我缓冲的日志。
问题是,尽管我已经在这个主题上设置了缓冲区大小:
this.subject = new ReplaySubject<LogEvent>(10);
当我使用OnNext添加到无限循环上的可观察集合时,我的程序的内存使用率达到了极限:
internal void WatchForNewEvents()
{
Task.Factory.StartNew(() =>
{
while (true)
{
dynamic parameters = new ExpandoObject();
// TODO: Add parameters for getting specific log events
if (this.logEventRepository.GetManyHasNewResults(parameters))
{
foreach (var recentEvent in this.logEventRepository.GetMany(parameters))
{
this.subject.OnNext(recentEvent);
}
}
// Commented this out for now to really see the memory go up
// Thread.Sleep(1000);
}
});
}
ReplaySubject上的缓冲区大小不起作用吗?当达到缓冲区大小时,它似乎并没有清除缓冲区。非常感谢您的帮助!
更新:
我添加这样的订阅者(这是错误的吗?):
public IDisposable Subscribe(IObserver<LogEvent> observer)
{
return this.subject.Subscribe(observer);
}
它被称为:
// Inserts into UI ListView
this.logEventObservable.Subscribe(evt => this.InsertNewLogEvent(evt));
我不确定这是否是最终的答案,但我怀疑您遇到了问题,因为您使用的调度器存在并发性。您在ReplaySubject
上调用的构造函数如下所示:
public ReplaySubject(int bufferSize)
: this(bufferSize, TimeSpan.MaxValue, Scheduler.CurrentThread)
{ }
Scheduler.CurrentThread
让我担心。试着把它改成Scheduler.ThreadPool
,看看这是否有帮助。
此外,顺便说一句,你似乎把Rx和TPL以及老式的线程睡眠混合在一起。通常最好避免那样做。您可以将WatchForNewEvents
代码更改为如下所示:
dynamic parameters = new ExpandoObject();
var newEvents =
from n in Observable.Interval(TimeSpan.FromSeconds(1.0))
where this.logEventRepository.GetManyHasNewResults(parameters)
from recentEvent in
this.logEventRepository.GetMany(parameters).ToObservable()
select recentEvent;
newEvents.Subscribe(this.subject);
这是一种很好的紧凑的Rx-y做事方式。